ØMQ supports a number of different message-passing patterns that work great in Node. We’ll start with the publish/subscribe pattern (PUB/SUB).
Recall the code we wrote in Chapter 3, Networking with Sockets, when we developed a networked file-watching service and a client to connect to it. They communicated over TCP by sending LDJ messages. The server would publish information in this format, and any number of client programs could subscribe to it.
We had to work hard to make our client code safely handle the message-boundary problem. And we created a separate module dedicated to buffering chunked data and emitting messages. Even so, we were left with questions like how to handle network interrupts and server restarts.
ØMQ makes all of this much simpler by taking care of low-level details like buffering and reconnecting. Let’s see how much easier it is by implementing a watcher that uses ØMQ PUB/SUB instead of naked TCP. This will get us used to the ØMQ way of doing things, and set us up to explore other messaging patterns with Node.js and ØMQ.
First, let’s implement the PUB half of a PUB/SUB pair using the zeromq module. Open an editor and enter the following:
| | 'use strict'; |
| | const fs = require('fs'); |
| | const zmq = require('zeromq'); |
| | const filename = process.argv[2]; |
| | |
| | // Create the publisher endpoint. |
| | const publisher = zmq.socket('pub'); |
| | |
| | fs.watch(filename, () => { |
| | |
| | // Send a message to any and all subscribers. |
| | publisher.send(JSON.stringify({ |
| | type: 'changed', |
| | file: filename, |
| | timestamp: Date.now() |
| | })); |
| | |
| | }); |
| | |
| | // Listen on TCP port 60400. |
| | publisher.bind('tcp://*:60400', err => { |
| | if (err) { |
| | throw err; |
| | } |
| | console.log('Listening for zmq subscribers...'); |
| | }); |
Save the file as zmq-watcher-pub.js. This program is similar to ones we developed in previous chapters, with a few differences.
Instead of requiring the net module, now we’re requiring zeromq and saving it as a variable called zmq for brevity. We use it to create a publisher endpoint by calling zmq.socket(’pub’).
Importantly, we have only one call to fs.watch. Our servers from the last chapter would invoke watch once for each connected client. Here we have just one filesystem watcher, which invokes the publisher’s send method.
Notice that the string we send to publisher.send is the output of JSON.stringify. ØMQ does not perform serialization of messages itself—it is interested only in pushing bytes down the wire. It’s our job to serialize and deserialize any messages we send through ØMQ.
Finally, we call publisher.bind(’tcp://*:60400’) to tell ØMQ to listen on TCP port 60400 for subscribers. Let’s get the publisher running:
| | $ node zmq-watcher-pub.js target.txt |
| | Listening for zmq subscribers... |
Even though this service uses TCP, we can’t simply use nc like we did in the last chapter to get anything out of it. A ØMQ server requires a ØMQ client.
Now we’ll implement the subscriber endpoint.
Implementing the SUB portion of the ØMQ PUB/SUB pair requires even less code than the publisher. Open an editor and enter this:
| | 'use strict'; |
| | const zmq = require('zeromq'); |
| | |
| | // Create subscriber endpoint. |
| | const subscriber = zmq.socket('sub'); |
| | |
| | // Subscribe to all messages. |
| | subscriber.subscribe(''); |
| | |
| | // Handle messages from the publisher. |
| | subscriber.on('message', data => { |
| | const message = JSON.parse(data); |
| | const date = new Date(message.timestamp); |
| | console.log(`File "${message.file}" changed at ${date}`); |
| | }); |
| | |
| | // Connect to publisher. |
| | subscriber.connect("tcp://localhost:60400"); |
Save this file as zmq-watcher-sub.js. It uses zmq.socket(’sub’) to make a subscriber endpoint.
Calling subscriber.subscribe(”) tells ØMQ that we want to receive all messages. If you only want certain messages, you can provide a string that acts as a prefix filter. You must call subscribe at some point in your code—you won’t receive any messages until you do.
The subscriber object inherits from EventEmitter. It emits a message event whenever it receives one from a publisher, so we use subscriber.on to listen for them.
Lastly, we use subscriber.connect to establish the connection.
Let’s see how these pieces fit together. With the PUB program still running in one terminal, fire up zmq-watcher-sub in a second one:
| | $ node zmq-watcher-sub.js |
Then, in a third terminal, touch the target file:
| | $ touch target.txt |
In the subscriber terminal, you should see output something like this:
| | File "target.txt" changed at Tue Mar 01 2016 05:25:39 GMT-0500 (EST) |
So far, things look pretty great. The publisher and subscriber programs are able to successfully communicate over the PUB/SUB socket pair.
But it gets even better. Keep those services running; next we’ll cover how ØMQ handles network interruptions.
Let’s see what happens when one of the endpoints gets disconnected unexpectedly. Try killing the publisher in its terminal via Ctrl-C.
Afterward, switch over to the subscriber terminal. You may notice that something strange happened—nothing. The subscriber keeps waiting for messages even though the publisher is down, as though nothing happened.
Start up the publisher again in the first terminal, then touch the target file. The subscriber should log a File changed message to the console. It’s as though they were connected the whole time.
From ØMQ’s perspective, it doesn’t matter which endpoint starts up first. It automatically establishes and reestablishes the connection when either endpoint comes online. These characteristics add up to a robust platform that gives you stability without a lot of work on your part.
In our previous examples, both the PUB and SUB endpoints were made from zmq.socket. This means they both have the power to either bind or connect. Our code had the publisher bind a TCP socket (as the server) and the subscriber connect (as the client), but ØMQ doesn’t force you to do it this way. We could have flipped it around and had the subscriber bind a socket to which the publisher connects.
When you design a networked application, you’ll typically have the more permanent parts of your architecture bind and have the transient parts connect to them. With ØMQ, you get to decide which parts of your system will come and go, and which messaging pattern best suits your needs. But you don’t have to decide them at the same time, and it’s easy to change your mind later. ØMQ provides flexible, durable pipes for constructing distributed applications.
Next we’ll look at a different messaging pattern: request/reply (REQ/REP). Then we’ll tie this in with Node.js’s clustering support to manage a pool of worker processes.