Friday, 11 March 2011

Messaging Presentation

Here some slides about Messaging, AMQP, RabbitMQ. This slides are part of a small tech talk and includes the example from my previous post.

Wednesday, 9 March 2011

RabbitMQ and Node.js Websockets experiment

I've been experimenting with Rabbitmq and Node.js support for Websockets.
The idea is just have a backend to generate random numbers (0-10) and plot the numbers in a realtime graph with websockets.
For that we going to use a Publish/Subscriber model with AMQP and RabbitMQ. The backend (aka Groovy Script) publish the numbers in a exchange. And the suscriber (aka Node.js with Socket io) get the messages from the queue and push the messages in the web browser.

Here Groovy Script
to put messages in the exchange "stockExchange". It generates 1 message per 300 miliseconds !!!!
import com.rabbitmq.client.*
import java.util.Random
@Grab(group='com.rabbitmq', module='amqp-client', version='1.7.2')
params = new ConnectionParameters(
username: 'guest',
password: 'guest',
virtualHost: '/',
requestedHeartbeat: 0
)
factory = new ConnectionFactory(params)
conn = factory.newConnection('127.0.0.1', 5672)
channel = conn.createChannel()
exchangeName = 'stockExchange'
key = 'key.a'
Random rand = new Random()
int max = 10
while(true){
int next = rand.nextInt(max+1)
String msg = "${next}"
channel.basicPublish(exchangeName, key , MessageProperties.TEXT_PLAIN , msg.bytes)
Thread.sleep(300)
}
channel.close()
conn.close()


Here the client code, that use smoothiecharts to render the graph with Websockets

<html>
<body>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js"></script>
<script type="text/javascript" src="https://github.com/joewalnes/smoothie/raw/master/smoothie.js"></script>
<script src="http://localhost:8080/socket.io/socket.io.js"></script>
<div>
<p>Node.js Rabbitmq Websockets Canvan Smoothie Groovy</p>
<canvas id="mycanvas" width="800" height="300">
</canvas>
</div>
<script>
// canvas html5
var smoothie = new SmoothieChart();
smoothie.streamTo(document.getElementById("mycanvas"),1000);
var line1 = new TimeSeries();
// Add to SmoothieChart
smoothie.addTimeSeries(line1, { strokeStyle:'rgb(0, 255, 0)', fillStyle:'rgba(0, 255, 0, 0.4)', lineWidth:3 });
var socket = new io.Socket("localhost", {port: 8080, rememberTransport: false});
socket.connect();
socket.on('connect', function(){ })
socket.on('message', function(message){ line1.append(new Date().getTime(),parseFloat( message.message));})
socket.on('disconnect', function(){ })
socket.send("Hello socket world");
</script>
</body>
</html>
view raw index.html hosted with ❤ by GitHub

And here Node JS code that suscribes to an AMQP and send each message to the browsers
socketexample.js
// needs node.js
// npm socket.io
// npm amqp
// rabbitmq running at localhost
var http = require('http'),
url = require('url'),
fs = require('fs'),
io = require('socket.io'),
amqp = require('amqp'),
sys = require(process.binding('natives').util ? 'util' : 'sys');
send404 = function(res){
res.writeHead(404);
res.write('404');
res.end();
};
server = http.createServer(function(req, res){
// your normal server code
var path = url.parse(req.url).pathname;
switch (path){
//case '/json.js':
case '/':
fs.readFile(__dirname + "/index.html", function(err, data){
if (err) return send404(res);
res.writeHead(200, {'Content-Type': path == 'json.js' ? 'text/javascript' : 'text/html'})
res.write(data, 'utf8');
res.end();
});
break;
}
});
server.listen(8080);
// socket.io
var socket = io.listen(server);
// ampq
var connection = amqp.createConnection({ host: '127.0.0.1' });
connection.addListener('ready', function(){
var queue = connection.queue('stockQueue')
// create the exchange if it doesnt exist
var exchange = connection.exchange('stockExchange')
queue.bind("stockExchange", "key.a");
socket.on('connection', function(client){
//client.broadcast({ announcement: client.sessionId + ' connected' });
client.on('message', function(message){
console.log(message);
//var msg = { message: [client.sessionId, message] };
//client.broadcast(msg);
})
client.on('disconnect', function(){
//client.broadcast({ announcement: client.sessionId + ' disconnected' });
})
});
queue.subscribe(function(message){
//console.log("received message");
//console.log(message.data.toString());
socket.broadcast({message: message.data.toString()});
//console.log("===========================")
});
});