The idea is just have a backend to generate random numbers (0-10) and plot the numbers in a realtime graph with websockets.
For that we going to use a Publish/Subscriber model with AMQP and RabbitMQ. The backend (aka Groovy Script) publish the numbers in a exchange. And the suscriber (aka Node.js with Socket io) get the messages from the queue and push the messages in the web browser.

to put messages in the exchange "stockExchange". It generates 1 message per 300 miliseconds !!!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.* | |
import java.util.Random | |
@Grab(group='com.rabbitmq', module='amqp-client', version='1.7.2') | |
params = new ConnectionParameters( | |
username: 'guest', | |
password: 'guest', | |
virtualHost: '/', | |
requestedHeartbeat: 0 | |
) | |
factory = new ConnectionFactory(params) | |
conn = factory.newConnection('127.0.0.1', 5672) | |
channel = conn.createChannel() | |
exchangeName = 'stockExchange' | |
key = 'key.a' | |
Random rand = new Random() | |
int max = 10 | |
while(true){ | |
int next = rand.nextInt(max+1) | |
String msg = "${next}" | |
channel.basicPublish(exchangeName, key , MessageProperties.TEXT_PLAIN , msg.bytes) | |
Thread.sleep(300) | |
} | |
channel.close() | |
conn.close() |
Here the client code, that use smoothiecharts to render the graph with Websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<html> | |
<body> | |
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js"></script> | |
<script type="text/javascript" src="https://github.com/joewalnes/smoothie/raw/master/smoothie.js"></script> | |
<script src="http://localhost:8080/socket.io/socket.io.js"></script> | |
<div> | |
<p>Node.js Rabbitmq Websockets Canvan Smoothie Groovy</p> | |
<canvas id="mycanvas" width="800" height="300"> | |
</canvas> | |
</div> | |
<script> | |
// canvas html5 | |
var smoothie = new SmoothieChart(); | |
smoothie.streamTo(document.getElementById("mycanvas"),1000); | |
var line1 = new TimeSeries(); | |
// Add to SmoothieChart | |
smoothie.addTimeSeries(line1, { strokeStyle:'rgb(0, 255, 0)', fillStyle:'rgba(0, 255, 0, 0.4)', lineWidth:3 }); | |
var socket = new io.Socket("localhost", {port: 8080, rememberTransport: false}); | |
socket.connect(); | |
socket.on('connect', function(){ }) | |
socket.on('message', function(message){ line1.append(new Date().getTime(),parseFloat( message.message));}) | |
socket.on('disconnect', function(){ }) | |
socket.send("Hello socket world"); | |
</script> | |
</body> | |
</html> | |
And here Node JS code that suscribes to an AMQP and send each message to the browsers
socketexample.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// needs node.js | |
// npm socket.io | |
// npm amqp | |
// rabbitmq running at localhost | |
var http = require('http'), | |
url = require('url'), | |
fs = require('fs'), | |
io = require('socket.io'), | |
amqp = require('amqp'), | |
sys = require(process.binding('natives').util ? 'util' : 'sys'); | |
send404 = function(res){ | |
res.writeHead(404); | |
res.write('404'); | |
res.end(); | |
}; | |
server = http.createServer(function(req, res){ | |
// your normal server code | |
var path = url.parse(req.url).pathname; | |
switch (path){ | |
//case '/json.js': | |
case '/': | |
fs.readFile(__dirname + "/index.html", function(err, data){ | |
if (err) return send404(res); | |
res.writeHead(200, {'Content-Type': path == 'json.js' ? 'text/javascript' : 'text/html'}) | |
res.write(data, 'utf8'); | |
res.end(); | |
}); | |
break; | |
} | |
}); | |
server.listen(8080); | |
// socket.io | |
var socket = io.listen(server); | |
// ampq | |
var connection = amqp.createConnection({ host: '127.0.0.1' }); | |
connection.addListener('ready', function(){ | |
var queue = connection.queue('stockQueue') | |
// create the exchange if it doesnt exist | |
var exchange = connection.exchange('stockExchange') | |
queue.bind("stockExchange", "key.a"); | |
socket.on('connection', function(client){ | |
//client.broadcast({ announcement: client.sessionId + ' connected' }); | |
client.on('message', function(message){ | |
console.log(message); | |
//var msg = { message: [client.sessionId, message] }; | |
//client.broadcast(msg); | |
}) | |
client.on('disconnect', function(){ | |
//client.broadcast({ announcement: client.sessionId + ' disconnected' }); | |
}) | |
}); | |
queue.subscribe(function(message){ | |
//console.log("received message"); | |
//console.log(message.data.toString()); | |
socket.broadcast({message: message.data.toString()}); | |
//console.log("===========================") | |
}); | |
}); | |
3 comments:
Hello, I just read your blog post, and used it as an reference. But each time I call socket.broadcast() I recieve an error that this method does not exist.. do you have any idea on how to fixs that?
I am loving this experiment :)
Hi, where did you declare the "stockQueue" queue?
Post a Comment