Here some slides about Messaging, AMQP, RabbitMQ. This slides are part of a small tech talk and includes the example from my previous post.
Friday, 11 March 2011
Wednesday, 9 March 2011
RabbitMQ and Node.js Websockets experiment
I've been experimenting with Rabbitmq and Node.js support for Websockets.
The idea is just have a backend to generate random numbers (0-10) and plot the numbers in a realtime graph with websockets.
For that we going to use a Publish/Subscriber model with AMQP and RabbitMQ. The backend (aka Groovy Script) publish the numbers in a exchange. And the suscriber (aka Node.js with Socket io) get the messages from the queue and push the messages in the web browser.
Here Groovy Script
to put messages in the exchange "stockExchange". It generates 1 message per 300 miliseconds !!!!
Here the client code, that use smoothiecharts to render the graph with Websockets
And here Node JS code that suscribes to an AMQP and send each message to the browsers
socketexample.js
The idea is just have a backend to generate random numbers (0-10) and plot the numbers in a realtime graph with websockets.
For that we going to use a Publish/Subscriber model with AMQP and RabbitMQ. The backend (aka Groovy Script) publish the numbers in a exchange. And the suscriber (aka Node.js with Socket io) get the messages from the queue and push the messages in the web browser.

to put messages in the exchange "stockExchange". It generates 1 message per 300 miliseconds !!!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.* | |
import java.util.Random | |
@Grab(group='com.rabbitmq', module='amqp-client', version='1.7.2') | |
params = new ConnectionParameters( | |
username: 'guest', | |
password: 'guest', | |
virtualHost: '/', | |
requestedHeartbeat: 0 | |
) | |
factory = new ConnectionFactory(params) | |
conn = factory.newConnection('127.0.0.1', 5672) | |
channel = conn.createChannel() | |
exchangeName = 'stockExchange' | |
key = 'key.a' | |
Random rand = new Random() | |
int max = 10 | |
while(true){ | |
int next = rand.nextInt(max+1) | |
String msg = "${next}" | |
channel.basicPublish(exchangeName, key , MessageProperties.TEXT_PLAIN , msg.bytes) | |
Thread.sleep(300) | |
} | |
channel.close() | |
conn.close() |
Here the client code, that use smoothiecharts to render the graph with Websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<html> | |
<body> | |
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js"></script> | |
<script type="text/javascript" src="https://github.com/joewalnes/smoothie/raw/master/smoothie.js"></script> | |
<script src="http://localhost:8080/socket.io/socket.io.js"></script> | |
<div> | |
<p>Node.js Rabbitmq Websockets Canvan Smoothie Groovy</p> | |
<canvas id="mycanvas" width="800" height="300"> | |
</canvas> | |
</div> | |
<script> | |
// canvas html5 | |
var smoothie = new SmoothieChart(); | |
smoothie.streamTo(document.getElementById("mycanvas"),1000); | |
var line1 = new TimeSeries(); | |
// Add to SmoothieChart | |
smoothie.addTimeSeries(line1, { strokeStyle:'rgb(0, 255, 0)', fillStyle:'rgba(0, 255, 0, 0.4)', lineWidth:3 }); | |
var socket = new io.Socket("localhost", {port: 8080, rememberTransport: false}); | |
socket.connect(); | |
socket.on('connect', function(){ }) | |
socket.on('message', function(message){ line1.append(new Date().getTime(),parseFloat( message.message));}) | |
socket.on('disconnect', function(){ }) | |
socket.send("Hello socket world"); | |
</script> | |
</body> | |
</html> | |
And here Node JS code that suscribes to an AMQP and send each message to the browsers
socketexample.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// needs node.js | |
// npm socket.io | |
// npm amqp | |
// rabbitmq running at localhost | |
var http = require('http'), | |
url = require('url'), | |
fs = require('fs'), | |
io = require('socket.io'), | |
amqp = require('amqp'), | |
sys = require(process.binding('natives').util ? 'util' : 'sys'); | |
send404 = function(res){ | |
res.writeHead(404); | |
res.write('404'); | |
res.end(); | |
}; | |
server = http.createServer(function(req, res){ | |
// your normal server code | |
var path = url.parse(req.url).pathname; | |
switch (path){ | |
//case '/json.js': | |
case '/': | |
fs.readFile(__dirname + "/index.html", function(err, data){ | |
if (err) return send404(res); | |
res.writeHead(200, {'Content-Type': path == 'json.js' ? 'text/javascript' : 'text/html'}) | |
res.write(data, 'utf8'); | |
res.end(); | |
}); | |
break; | |
} | |
}); | |
server.listen(8080); | |
// socket.io | |
var socket = io.listen(server); | |
// ampq | |
var connection = amqp.createConnection({ host: '127.0.0.1' }); | |
connection.addListener('ready', function(){ | |
var queue = connection.queue('stockQueue') | |
// create the exchange if it doesnt exist | |
var exchange = connection.exchange('stockExchange') | |
queue.bind("stockExchange", "key.a"); | |
socket.on('connection', function(client){ | |
//client.broadcast({ announcement: client.sessionId + ' connected' }); | |
client.on('message', function(message){ | |
console.log(message); | |
//var msg = { message: [client.sessionId, message] }; | |
//client.broadcast(msg); | |
}) | |
client.on('disconnect', function(){ | |
//client.broadcast({ announcement: client.sessionId + ' disconnected' }); | |
}) | |
}); | |
queue.subscribe(function(message){ | |
//console.log("received message"); | |
//console.log(message.data.toString()); | |
socket.broadcast({message: message.data.toString()}); | |
//console.log("===========================") | |
}); | |
}); | |
Subscribe to:
Posts (Atom)