ZeroMQ Subscribe-Publish Pattern

Set up ZeroMQ subscribe-publish pattern, useful for selectively receive certain kind of events.

ZeroMQ is a great message queue library that has been around for a while with well-documented APIs. One of the most useful setup is the subscribe-publish pattern allowing multiple processes or threads to pass messages to each other, receiving only subscribed messages without having to deal with all the published traffic.

Setup

Everything starts with a context:

void* ctx = zmq_ctx_new();

Hold onto this context, we'll need it later. In this setup we want to be able to subscribe from and publish to different peers without knowing ahead of time the socket addresses. A simple solution is creating a proxy thread with predetermined socket addressess sub_addr and pub_addr:

void* sub = zmq_socket(ctx, ZMQ_XSUB);
if( sub && zmq_bind(sub, sub_addr) == 0 ) {
  void* pub = zmq_socket(ctx, ZMQ_XPUB);
  if( pub && zmq_bind(pub, pub_addr) == 0 ) {
    zmq_proxy(sub, pub, 0);
    zmq_close(pub);
  }
  zmq_close(sub);
}

zmq_proxy() blocks and uses current thread to forward published messages to subscribers. To unblock it, we will call:

zmq_ctx_destroy(ctx);

Which will tear everything down, allowing you to join the proxy thread and clean up.

It is worth noting that, zmq_ctx_destroy() will block until all sockets are closed. Therefore if you have another thread that is using sockets created with the same context, you need to close them using zmq_close() first.

With this setup, the proxy thread can come up before or after the client connects. Obviously some published messages may be lost if proxy thread isn't running, but the client sockets may buffer and retry until sockets flush send/receive queues.

Connecting Clients

Clients will need their own sockets, ZMQ_SUB for a socket that receives data and ZMQ_PUB for sending. Each socket will need to connect to the public subscribe and publish addresses established earlier during setup. For example:

void* sub = zmq_socket(ctx, ZMQ_SUB);
if( sub && zmq_connect(sub, sub_addr) == 0 ) {
  ...
}

Once connected, client can subscribe to a byte signature:

uint32_t signature = 0x33554432;
zmq_setsockopt(sub, ZMQ_SUBSCRIBE, &signature, sizeof(signature);

Above code enables messages whose content matching value of signature to be available for reading from socket sub.

Envelope

ZeroMQ messages can be broken into parts, and the library guarantees that either all or none gets delivered. We can use the first part as envelope that contains the byte signature so messages can get forwarded. You may consider laying out messages like this:

32-bit signature
header data; info about payload
payload

References