1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
use Mojolicious::Lite;
use Mojo::EventEmitter;
use Mojo::RabbitMQ::Client;
use Mojo::RabbitMQ::Client::Channel;
helper events => sub { state $events = Mojo::EventEmitter->new };
get '/' => 'chat';
websocket '/channel' => sub {
my $c = shift;
$c->inactivity_timeout(3600);
# Forward messages from the browser
$c->on(message => sub { shift->events->emit(mojochat => ['browser', shift]) }
);
# Forward messages to the browser
my $cb = $c->events->on(mojochat => sub { $c->send(join(': ', @{$_[1]})) });
$c->on(finish => sub { shift->events->unsubscribe(mojochat => $cb) });
};
my $amqp
= Mojo::RabbitMQ::Client->new(
url => ($ENV{MOJO_RABBITMQ_URL} || 'amqp://guest:guest@127.0.0.1:5672/')
);
$amqp->on(
open => sub {
my ($client) = @_;
warn "client connected";
my $channel = Mojo::RabbitMQ::Client::Channel->new();
$channel->catch(sub { warn 'Error on channel received'; });
$channel->on(
open => sub {
# Forward every message from browser to message queue
app->events->on(
mojochat => sub {
return unless $_[1]->[0] eq 'browser';
$channel->publish(
exchange => 'mojo',
routing_key => '',
body => $_[1]->[1],
mandatory => 0,
immediate => 0,
header => {}
)->deliver();
}
);
# Create anonymous queue and bind it to fanout exchange named mojo
my $queue = $channel->declare_queue(exclusive => 1);
$queue->on(
success => sub {
my $method = $_[1]->method_frame;
my $bind = $channel->bind_queue(
exchange => 'mojo',
queue => $method->queue,
routing_key => '',
);
$bind->on(
success => sub {
my $consumer = $channel->consume(queue => $method->queue);
# Forward every received messsage to browser
$consumer->on(
message => sub {
app->events->emit(
mojochat => ['amqp', $_[1]->{body}->payload]);
}
);
$consumer->deliver();
}
);
$bind->deliver();
}
);
$queue->deliver();
}
);
$channel->on(close => sub { warn 'Channel closed'; });
$client->open_channel($channel);
}
);
$amqp->connect();
# Minimal single-process WebSocket chat application for browser testing
app->start;
__DATA__
@@ chat.html.ep
<form onsubmit="sendChat(this.children[0]); return false"><input></form>
<div id="log"></div>
<script>
var ws = new WebSocket('<%= url_for('channel')->to_abs %>');
ws.onmessage = function (e) {
document.getElementById('log').innerHTML += '<p>' + e.data + '</p>';
};
function sendChat(input) { ws.send(input.value); input.value = '' }
</script>
|