Skip to content

Latest commit

 

History

History
1028 lines (784 loc) · 33.6 KB

File metadata and controls

1028 lines (784 loc) · 33.6 KB

The ZeroMQ Guide — 30 Minute Edition

The zguide is a ~500 page book. This is the 30-minute version — half a CS lecture — that teaches you the same material, for the modern brain with a 12-second attention span. No philosophical digressions, no committee design talk. Just how ØMQ works, the patterns, and how to use them from Ruby with OMQ.

Working examples live in examples/zguide/ — each is a standalone Minitest file you can run directly.


What ZeroMQ is (and isn't)

ZeroMQ is a messaging library, not a server. No daemon, no broker, no config files, no JVM. You link it into your process the way you'd link libpthread or libcurl. It gives you sockets that carry discrete messages (not byte streams) over tcp, ipc, inproc (inter-thread), or pgm (multicast).

Think of it as Berkeley sockets that understand message framing, do async I/O in a background thread, reconnect automatically, and route messages by pattern — REQ/REP, PUB/SUB, PUSH/PULL, and so on.

The "zero" in ZeroMQ means zero broker, zero latency (as close as possible), zero administration. You don't install a service; you gem install omq and start writing code.

What it replaces

Instead of… ZeroMQ gives you…
Raw TCP + hand-rolled framing Length-delimited message frames
RabbitMQ / Kafka / NATS In-process library, no broker to operate
gRPC / HTTP Async, pattern-based, polyglot sockets
D-Bus / COM / CORBA Lightweight IPC without a bus daemon
POSIX MQs Cross-network, cross-language, multi-pattern

Is ZeroMQ dead?

The project is quiet. The last libzmq release was 4.3.5 (2023), CZMQ 4.2.1 (2020), and the zguide hasn't seen significant updates since ~2014. The mailing list is low-traffic. There's no commercial entity driving development.

But "dead" and "done" are different things. The library is rock-solid, battle-tested over 15+ years, and does exactly what it claims. The wire protocol (ZMTP 3.1) is stable and well-specified. The patterns in the zguide are timeless distributed systems fundamentals — they don't expire. libzmq runs in production at scale across industries, and the API hasn't needed to change because it got the abstractions right.

Think of it like SQLite, or zlib, or libevent — mature infrastructure that just works. You don't need a release every quarter to trust it.

OMQ implements the same wire protocol (ZMTP 3.1) in pure Ruby, so it interoperates with any ZeroMQ peer — libzmq, CZMQ, pyzmq, etc.

Why not just use a broker?

Brokers are great when you actually need them — durable queues, dead-letter handling, multi-tenant isolation. But for many use cases — internal service communication, pipeline processing, real-time data distribution, inter-thread coordination — a broker is overhead you're paying for but not using.

ØMQ is brokerless by default. Processes connect directly to each other — no central server to deploy, monitor, or lose. This decentralized approach means no single point of failure and half the network hops. When you do need a broker, ØMQ lets you build one with ROUTER/DEALER sockets (see Majordomo below) — but it's your code, your rules, not a black box.

It's the difference between installing PostgreSQL when all you need is SQLite, or deploying Kubernetes when a systemd unit file would do.


What makes ØMQ sockets special

ØMQ sockets aren't BSD sockets with extra steps. They're a different animal:

  • You never touch the network directly. Sends queue into an in-process buffer; OMQ handles the wire protocol for you. You don't write any networking code.
  • Connect/bind order doesn't matter. You can connect before the remote side has bound. ØMQ queues messages and retries in the background. Startup order is decoupled.
  • Automatic reconnection. If a TCP connection drops, ØMQ reconnects transparently. Messages sent during the window are queued. You don't write retry loops or handle ECONNREFUSED.
  • Messages, not bytes. TCP is a byte stream. ØMQ is message-oriented — send a message, receive that exact message. No framing code, no reassembly buffers.
  • Transport-agnostic. Switch from tcp:// to ipc:// to inproc:// by changing the endpoint string. Same API, same patterns. inproc:// gives sub-µs latency; tcp:// gives cross-machine reach.
  • Built-in back-pressure. Every socket has configurable high-water marks (default: 1000 messages). Full queue? PUB drops, PUSH blocks. No silent unbounded memory growth.

The four core patterns

ØMQ's API is organized around patterns — predefined roles that sockets play. Each socket type enforces a messaging discipline so you can't accidentally send when you should receive.

1. REQ/REP — Request-Reply

  client (REQ)  ──req──▶  server (REP)
                ◀──rep──

Strict lockstep: send, recv, send, recv. Violating the sequence returns an error. The socket automatically prepends/strips an empty delimiter frame (the envelope) so the REP knows where to route the reply.

This is the simplest pattern and maps directly to RPC. The lockstep enforcement means you can't accidentally pipeline requests or forget to reply. The downside: if the server crashes after receiving but before replying, the REQ socket is stuck in "expecting recv" state. The only fix is to destroy it and create a new one (Lazy Pirate pattern).

require 'omq'
require 'async'

Async do |task|
  # --- server ---
  rep = OMQ::REP.bind('tcp://*:5555')
  task.async do
    loop do
      msg = rep.receive            # => ["Hello"]
      rep << "World"
    end
  end

  # --- client ---
  req = OMQ::REQ.connect('tcp://localhost:5555')
  req << "Hello"
  reply = req.receive              # => ["World"]
end

Scaling it up: stick a ROUTER/DEALER proxy in the middle and fan out to N workers without changing client code.

→ see examples/zguide/01_req_rep.rb

2. PUB/SUB — Publish-Subscribe

  publisher (PUB)  ──msg──▶  subscriber 1 (SUB)
                   ──msg──▶  subscriber 2 (SUB)
                   ──msg──▶  subscriber N (SUB)

PUB sends to all connected SUBs. SUB filters by topic prefix — filtering happens on the publisher side (since ZeroMQ 3.x), so unwanted messages don't even cross the network. In OMQ, SUB.new starts with no subscriptions — pass subscribe: to filter by prefix, or call #subscribe later.

Late-joining subscribers miss earlier messages — this is by design, like tuning into a radio station. If you need catch-up, layer a snapshot mechanism on top (see Clone pattern below).

PUB never blocks; if a subscriber is slow, messages are dropped once the high-water mark is hit. Adding more subscribers doesn't slow down the publisher.

One subtle point: PUB/SUB has a startup race. When a SUB connects, the subscription must propagate to the publisher. Messages published during this brief window (µs for inproc, ms for tcp) are lost. If your first message matters, synchronize with a REQ/REP handshake first.

# --- publisher ---
pub = OMQ::PUB.bind('tcp://*:5556')
loop do
  pub << "weather.nyc #{rand(60..100)}F"
  pub << "weather.sfo #{rand(50..80)}F"
  sleep 1
end

# --- subscriber ---
sub = OMQ::SUB.connect('tcp://localhost:5556', subscribe: 'weather.nyc')
loop do
  msg = sub.receive             # => ["weather.nyc 74F"]
  puts msg.first
end

# --- subscribe to everything ---
sub = OMQ::SUB.connect('tcp://localhost:5556', subscribe: '')

# --- subscribe later ---
sub = OMQ::SUB.new('tcp://localhost:5556')
sub.subscribe('weather.sfo')

→ see examples/zguide/02_pub_sub.rb

3. PUSH/PULL — Pipeline

  ventilator (PUSH)  ──task──▶  worker 1 (PULL)
                     ──task──▶  worker 2 (PULL)
                     ──task──▶  worker N (PULL)
                                     │
                                   result
                                     ▼
                                sink (PULL)

Fan-out/fan-in. PUSH round-robins messages across connected PULLs. No replies, no envelopes — just a unidirectional pipeline. Think of it as xargs -P for network messages.

The round-robin is fair but not load-aware: fast and slow workers get the same number of tasks. For load-aware distribution, use ROUTER/DEALER with a "send me work when I'm ready" protocol.

Unlike PUB/SUB, PUSH/PULL provides back-pressure. If all workers are busy and their queues are full, PUSH blocks (or times out).

Async do |task|
  # --- ventilator ---
  push = OMQ::PUSH.new
  push.bind('tcp://*:5557')
  100.times { |i| push << "task #{i}" }

  # --- worker (run N of these) ---
  task.async do
    pull = OMQ::PULL.connect('tcp://localhost:5557')
    sink = OMQ::PUSH.connect('tcp://localhost:5558')
    loop do
      t = pull.receive.first
      result = process(t)
      sink << result
    end
  end

  # --- sink ---
  task.async do
    pull = OMQ::PULL.bind('tcp://*:5558')
    loop { puts pull.receive.first }
  end
end

→ see examples/zguide/03_pipeline.rb

4. PAIR — Exclusive Pair

One-to-one, bidirectional, no routing. Designed for coordinating two tasks within a process via inproc. Not meant for network use — no reconnection, no identity handling. A full-duplex pipe.

Async do
  a = OMQ::PAIR.bind('inproc://pipe')
  b = OMQ::PAIR.connect('inproc://pipe')

  a << 'ping'
  b.receive      # => ["ping"]
  b << 'pong'
  a.receive      # => ["pong"]
end

The advanced socket types

Beyond the four basic patterns, ØMQ provides "raw" socket types for manual control over routing. These are the building blocks for brokers, proxies, and custom topologies.

ROUTER

A ROUTER socket tracks every connection with an identity — a binary string that uniquely identifies each peer (auto-generated or set via socket.identity = 'name'). On receive, ROUTER prepends the sender's identity as the first frame. On send, you provide the target identity as the first frame — ROUTER strips it and delivers to that peer.

Messages to unknown identities are silently dropped by default. ROUTER is the workhorse of every broker pattern.

router = OMQ::ROUTER.bind('tcp://*:5559')

# Receive: [client_identity, "", "Hello"]
msg = router.receive
identity = msg[0]

# Reply to that specific client
router << [identity, "", "World"]

# Or use the convenience method:
router.send_to(identity, "World")

DEALER

An async REQ — round-robins outgoing messages across connections and fair-queues incoming messages, but without the send/recv lockstep. You can fire off 10 requests without waiting for replies. The trade-off: you manage envelopes yourself.

dealer = OMQ::DEALER.new
dealer.identity = 'worker-1'
dealer.connect('tcp://localhost:5559')

# No lockstep — send multiple requests without waiting
dealer << ["", "request 1"]
dealer << ["", "request 2"]

# Receive replies as they come
reply = dealer.receive

ROUTER + DEALER = async broker

  clients (REQ)  ──▶  ROUTER │ proxy │ DEALER  ──▶  workers (REP)

ROUTER facing clients, DEALER facing workers. Messages flow through, DEALER round-robins to workers. Clients and workers are completely decoupled — add workers by connecting more processes, remove by killing them. In Ruby with Async, two tasks forward in each direction:

Async do |task|
  frontend = OMQ::ROUTER.bind('tcp://*:5559')
  backend  = OMQ::DEALER.bind('tcp://*:5560')

  task.async do
    loop { backend << frontend.receive }
  end

  loop { frontend << backend.receive }
end

→ see the broker test in examples/zguide/01_req_rep.rb

The envelope through a ROUTER chain

Understanding frame flow through ROUTER is essential for building brokers:

REQ client sends:     ["Hello"]
                      ↓  (REQ prepends empty delimiter)
On the wire:          ["", "Hello"]
                      ↓  (ROUTER prepends client identity)
ROUTER receives:      ["client-A", "", "Hello"]
                      ↓  (broker forwards to DEALER)
DEALER sends:         ["client-A", "", "Hello"]
                      ↓  (REP strips envelope, delivers payload)
REP receives:         ["Hello"]

REP sends reply:      ["World"]
                      ↓  (REP re-wraps saved envelope)
On the wire:          ["client-A", "", "World"]
                      ↓  (DEALER fair-queues to ROUTER)
ROUTER receives:      ["client-A", "", "World"]
                      ↓  (ROUTER routes by first frame)
REQ receives:         ["World"]

The empty delimiter separates routing (everything before it) from payload (everything after). With two ROUTER hops, each adds an identity frame: ["hop2-id", "hop1-id", "", "payload"]. Each ROUTER on the return path peels off one identity.

XPUB / XSUB

Like PUB/SUB but subscription events are exposed as data frames (\x01topic to subscribe, \x00topic to unsubscribe). This lets you build subscription-forwarding proxies — essential for multi-hop pub-sub topologies.

Async do |task|
  # XSUB connects to upstream publishers
  xsub = OMQ::XSUB.connect('tcp://upstream:5556')

  # XPUB binds for downstream subscribers
  xpub = OMQ::XPUB.bind('tcp://*:5560')

  # Forward subscriptions upstream, data downstream
  task.async do
    loop do
      event = xpub.receive         # => ["\x01weather"]
      xsub << event.first
    end
  end

  loop do
    msg = xsub.receive
    xpub << msg
  end
end

→ see examples/zguide/02_pub_sub.rb


Messages and framing

ØMQ messages are not byte streams. A message is one or more frames, each an opaque blob of bytes with a length. Unlike TCP (where sending "hello" then "world" might arrive as "helloworld"), ØMQ delivers each message whole and separate.

  • Messages are atomic: receive all frames or none
  • Strings are length-prefixed on the wire, not null-terminated
  • OMQ: #receive returns Array<String>, #<< accepts String or Array<String>

Multipart messages

socket << %w[routing-key header payload]
msg = socket.receive   # => ["routing-key", "header", "payload"]

All frames are sent and received atomically. Intermediate nodes forward all frames without inspecting them. This is how envelopes work — address frames at the front, payload at the back, empty delimiter between them.

Binary data

All received data is ASCII-8BIT (binary). ØMQ frames are raw bytes — no encoding is preserved. Send UTF-8, receive ASCII-8BIT.

push << "\x00\x01\x02\xff".b
msg = pull.receive
msg.first.encoding    # => Encoding::ASCII_8BIT
msg.first.bytes       # => [0, 1, 2, 255]

The envelope

REQ/REP use envelopes to track return addresses through proxy chains:

REQ sends:          ["Hello"]
On the wire:        ["", "Hello"]          ← REQ added empty delimiter
ROUTER receives:    ["client-id", "", "Hello"]  ← ROUTER added identity

Each ROUTER in the chain prepends another identity frame. REP strips and saves the envelope, hands you the payload, then re-wraps your reply.

If you use DEALER or ROUTER directly, you manage envelopes yourself — prepend the empty delimiter when sending through DEALER to REP, and handle identity frames with ROUTER. Getting this wrong is the #1 source of "my broker doesn't work" bugs.


Transports

Transport Syntax Notes
tcp tcp://host:port Cross-machine. Bread and butter.
ipc ipc:///tmp/feed.sock Unix domain socket. Fast, local.
ipc (abstract) ipc://@name Linux abstract namespace. No file.
inproc inproc://name Inter-fiber/thread. Sub-µs. Fastest.
Async do
  # TCP with auto-selected port
  server = OMQ::REP.new
  server.bind('tcp://127.0.0.1:*')
  port = server.last_tcp_port           # the chosen port

  # IPC with abstract namespace (Linux only)
  rep = OMQ::REP.bind('ipc://@myapp.rpc')
  req = OMQ::REQ.connect('ipc://@myapp.rpc')

  # inproc (in-process, same reactor)
  pull = OMQ::PULL.bind('inproc://pipeline')
  push = OMQ::PUSH.connect('inproc://pipeline')
  push << 'hello'
end

TCP host shorthands

OMQ::Transport::TCP normalizes a few host shorthands before binding or connecting. The behavior is deliberately symmetric between bind and connect so the same endpoint string works on both sides.

URL Bind Connect
tcp://*:PORT dual-stack wildcard (0.0.0.0 + ::) loopback host
tcp://:PORT loopback host loopback host
tcp://localhost:PORT loopback host loopback host
tcp://127.0.0.1:PORT pass-through (IPv4 only) pass-through
tcp://[::1]:PORT pass-through (IPv6 only) pass-through
tcp://0.0.0.0:PORT pass-through (IPv4 wildcard only) pass-through
tcp://[::]:PORT pass-through (IPv6 wildcard only) pass-through
tcp://host.name:PORT resolver + bind to each address resolver + Happy Eyeballs

Loopback host is ::1 when the machine has at least one non-loopback, non-link-local IPv6 address, otherwise 127.0.0.1. Detected once via Socket.getifaddrs and memoized for the process. See OMQ::Transport::TCP.loopback_host.

Dual-stack wildcard (tcp://*:PORT) binds both 0.0.0.0 and :: on the same port, with IPV6_V6ONLY set so the two wildcards don't collide on Linux. Implemented on top of Socket.tcp_server_sockets, which also coordinates ephemeral port allocation across the two families.

Explicit addresses (0.0.0.0, ::, 127.0.0.1, ::1) pass through unchanged — use them when you want to opt out of dual-stack and bind exactly one family.


Bind vs. connect

  • bind = "I'm the stable endpoint, I'll be here a while"
  • connect = "I'll find you at this address"

Rule of thumb: the node with a stable, well-known address binds. Everything else connects. A socket can bind and connect to different endpoints simultaneously.

OMQ types have sensible defaults (REP/ROUTER/PUB/XPUB/PULL bind; REQ/DEALER/SUB/XSUB/PUSH/PAIR connect), but you can override:

# Class methods (preferred)
rep = OMQ::REP.bind('tcp://*:5555')
req = OMQ::REQ.connect('tcp://localhost:5555')

# Override with @/> prefixes
rep = OMQ::REP.new('>tcp://broker:5555')  # force connect
req = OMQ::REQ.new('@tcp://*:5555')        # force bind

# Split creation from binding (for pre-connect options)
sock = OMQ::DEALER.new
sock.identity = 'worker-1'
sock.bind('tcp://*:5555')
sock.connect('tcp://other-host:5556')

Socket options

Options are accessed directly on the socket. Most must be set before connecting (identity, HWM). Timeouts and linger can change at any time.

sock = OMQ::REQ.new
sock.send_timeout = 1          # 1 second
sock.recv_timeout = 1          # 1 second
sock.identity = 'worker-1'    # ROUTER-visible identity

# nil = no timeout / wait indefinitely:
sock.send_timeout = nil

Key options:

Option Default Meaning
send_timeout nil Send timeout in seconds. nil = block forever
recv_timeout nil Receive timeout in seconds. nil = block forever
linger 0 Wait on close. nil = forever, 0 = drop
identity auto Socket identity for ROUTER addressing
send_hwm 1000 Send high-water mark (messages)
recv_hwm 1000 Receive high-water mark (messages)
reconnect_interval 0.1 Reconnect interval in seconds. nil = disabled
max_message_size nil Max inbound message size. nil = unlimited

Timeouts raise IO::TimeoutError — standard Ruby IO exceptions. The read_timeout / write_timeout aliases match Ruby's IO interface.


High-water marks

Every socket has a send HWM and receive HWM (default: 1000 messages). HWM is per-connection: a PUSH connected to 3 PULLs has 3×1000 buffer. When full:

  • PUB, ROUTER: drops messages (can't block — would stall all peers)
  • PUSH, REQ, DEALER: blocks the sender (until timeout)
sock.send_hwm = 10_000
sock.recv_hwm = 10_000
sock.set_unbounded              # HWM = 0 (unlimited)

Threading and concurrency

OMQ is built on Async fibers. All sockets run inside an Async { } block and cooperate via the fiber scheduler — no threads needed for concurrent I/O. Use task.async { } to run concurrent operations:

Async do |task|
  # Concurrent tasks share the reactor
  task.async do
    rep = OMQ::REP.bind('inproc://backend')
    loop do
      msg = rep.receive
      rep << "echo:#{msg.first}"
    end
  end

  req = OMQ::REQ.connect('inproc://backend')
  req << 'hello'
  puts req.receive.first  # => "echo:hello"
end

For multi-worker patterns, spawn one task per worker:

Async do |task|
  pull = OMQ::PULL.bind('inproc://work')
  4.times do |id|
    task.async do
      push = OMQ::PUSH.connect('inproc://results')
      loop do
        msg = pull.receive
        push << "worker-#{id}:#{msg.first}"
      end
    end
  end
end

→ see examples/zguide/01_req_rep.rb for a full broker example → see examples/zguide/03_pipeline.rb for multi-worker pipeline


Reliability patterns

ØMQ gives you no delivery guarantees out of the box. Messages can be lost on connection drops, HWM overflow, or process crashes.

This is by design, not a limitation. Centralized brokers that claim "exactly once delivery" are misleading — a message can sit in a TCP buffer or broker queue and look "delivered," but if the receiving process crashes before it reads that buffer, the message is gone. Only the application itself can confirm that a message was actually processed. This is the end-to-end argument: reliability belongs at the endpoints, not in the transport.

ØMQ gives you the building blocks — timeouts, retries, heartbeats, idempotent services — and the zguide shows how to compose them from simple to sophisticated:

Lazy Pirate (client-side retry)

REQ client sets a receive timeout. No reply? Close the socket, open a new one, retry. Give up after N attempts. You must recreate the socket because REQ's lockstep state machine is stuck after a missed recv.

MAX_RETRIES = 3

def lazy_pirate_request(endpoint, request)
  MAX_RETRIES.times do |attempt|
    req = OMQ::REQ.connect(endpoint)
    req.recv_timeout = 2.5

    req << request
    begin
      return req.receive.first
    rescue IO::TimeoutError
      puts "attempt #{attempt + 1} timed out"
      req.close
    end
  end
  raise 'Server appears to be offline'
end

→ see examples/zguide/04_lazy_pirate.rb

Simple Pirate (add a broker)

ROUTER-ROUTER queue between clients and workers. Workers send "READY" when available, queue routes to ready workers (LRU).

Paranoid Pirate (add heartbeating)

Workers and queue exchange periodic heartbeats. Queue evicts silent workers. Workers reconnect if queue goes silent. Rule: heartbeat at interval T, declare dead after T×3 silence.

Heartbeat pattern

Simple liveness detector using PUB/SUB:

HEARTBEAT_IVL = 1.0
DEAD_AFTER    = 3

Async do |task|
  pub = OMQ::PUB.bind('tcp://*:5555')
  task.async do
    loop do
      pub << 'HEARTBEAT'
      sleep HEARTBEAT_IVL
    end
  end

  # Subscriber side
  sub = OMQ::SUB.connect('tcp://localhost:5555', subscribe: 'HEARTBEAT')
  sub.recv_timeout = HEARTBEAT_IVL * 1.5
  misses = 0

  loop do
    begin
      sub.receive
      misses = 0
    rescue IO::TimeoutError
      misses += 1
      if misses >= DEAD_AFTER
        puts 'DEAD — taking action'
        misses = 0
      end
    end
  end
end

→ see examples/zguide/05_heartbeat.rb

Majordomo (service-oriented broker)

Workers register by service name. Clients request by name. Broker routes to the right worker pool. A service mesh in ~500 lines with no YAML.

Titanic (disconnected reliability)

Broker persists requests to disk before forwarding. Store-and-forward for when uptime matters more than latency.

Binary Star (high availability)

Primary-backup pair with PUB/SUB heartbeats. Failover on primary death. Fencing rule: backup only takes over if it was also receiving client requests — proving the primary is truly gone, not a network partition.

Freelance (brokerless reliability)

Client talks directly to multiple servers. Three models: sequential failover, shotgun (blast all, take first reply), or tracked (maintain connection state).


Pub-sub reliability

PUB/SUB is fire-and-forget, but you can layer reliability on top:

Late joiner problem

New subscribers miss earlier messages. Solutions:

  • Last Value Cache (LVC): proxy caches latest value per topic, replays to new subscribers.
  • Snapshot: subscriber gets current state over REQ/REP, then switches to live PUB/SUB.
# LVC proxy sketch
cache = {}
msg = xsub.receive.first
topic = msg.split(' ', 2).first
cache[topic] = msg
xpub << msg

# On new subscription, serve from cache
event = xpub.receive.first
if event.start_with?("\x01")
  topic = event[1..]
  xpub << cache[topic] if cache.key?(topic)
end

→ see examples/zguide/06_last_value_cache.rb

Slow subscriber problem

  • Suicidal Snail: subscriber checks lag, kills itself if too far behind. Let supervisor restart.
  • Credit-based flow control: subscriber sends credits (like TCP window), publisher only sends when credits available.

Clone pattern

The full-stack approach: server maintains a sequenced key-value store, publishes updates via PUB, serves snapshots via REQ/REP. Clients subscribe first (so they don't miss updates during snapshot), then get the snapshot, then apply buffered live updates — skipping any already in the snapshot using the sequence number.

# Clone client sketch
sub = OMQ::SUB.connect(pub_endpoint)
sub.recv_timeout = 1

# Subscribe first, snapshot second
req = OMQ::REQ.connect(snapshot_endpoint)
req << 'SNAPSHOT'
snapshot = req.receive.first
snapshot_seq = apply_snapshot(snapshot)

# Apply live updates, skip any with seq <= snapshot_seq
loop do
  msg = sub.receive.first
  seq, key, value = msg.split('|', 3)
  apply_update(key, value) if seq.to_i > snapshot_seq
end

→ see examples/zguide/07_clone.rb


OMQ API at a glance

Socket lifecycle

# Class methods (preferred)
pub = OMQ::PUB.bind('tcp://*:5556')
sub = OMQ::SUB.connect('tcp://localhost:5556')

# Constructor
pub = OMQ::PUB.new('tcp://*:5556')

# Split creation from binding (for pre-connect options)
pub = OMQ::PUB.new
pub.send_hwm = 50_000
pub.bind('tcp://*:5556')
pub.bind('ipc://@my-pub')       # multiple endpoints

pub.last_endpoint                # => "tcp://0.0.0.0:5556"
pub.last_tcp_port                # => 5556
pub.close                        # or let GC handle it

Sending and receiving

socket << 'hello'                         # single-frame
socket.send('hello')                      # same (shadows Object#send)
socket << ['frame1', 'frame2', 'frame3']  # multipart

msg = socket.receive   # => ["frame1", "frame2", "frame3"]
msg.first              # first frame

socket.__send__(:some_method)  # for Object#send metaprogramming

Socket types and their mixins

Type Includes Default action Notes
REQ Readable, Writable connect Strict send/recv lockstep
REP Readable, Writable bind Strict recv/send lockstep
DEALER Readable, Writable connect Async REQ, no lockstep
ROUTER Readable, Writable bind Identity-addressed routing
PUB Writable bind Send-only
SUB Readable connect Receive-only, topic filtered
XPUB Readable, Writable bind PUB + subscription events
XSUB Readable, Writable connect SUB + wire-protocol control
PUSH Writable connect Send-only, round-robin
PULL Readable bind Receive-only, fair-queued
PAIR Readable, Writable connect 1:1 bidirectional

PUB can't receive and SUB can't send — the type system prevents misuse at the Ruby level.


Common mistakes

1. Wrong REQ/REP ordering

REQ requires send-then-receive. Calling #receive on a fresh REQ hangs forever. If the server crashes mid-request, the REQ is stuck — destroy it and create a new one (Lazy Pirate).

2. Not setting timeouts

#receive without recv_timeout blocks forever. Always set timeouts in production and handle IO::TimeoutError.

3. PUB/SUB startup race

Subscription propagation takes time (µs for inproc, ms for tcp). First messages can be lost. Synchronize with REQ/REP or add a small sleep.

4. Sending to an unconnected ROUTER

Messages to unknown identities vanish silently. Set router_mandatory = true for errors instead.

5. Wrong envelope format

DEALER → REP requires the empty delimiter that REQ adds automatically. Without it, REP can't parse the message.

# WRONG
dealer << "hello"

# RIGHT
dealer << ["", "hello"]

Design philosophy

Unix philosophy, applied to messaging

  • Small, composable pieces connected by simple protocols
  • Each socket type does one thing well
  • No monolithic broker — compose your own topology
  • Crash and restart cleanly

Cheap vs. Nasty

  • Cheap (control plane): human-readable, synchronous, low-volume. JSON, REQ/REP. Easy to debug.
  • Nasty (data plane): binary, async, high-volume. PUB/SUB or PUSH/PULL. Fast to parse.

Most code is control plane; optimize only the hot path.


Topology patterns cheatsheet

1:1 RPC                    REQ ──▶ REP
1:N fan-out                PUB ──▶ SUB, SUB, SUB
N:1 fan-in                 PUSH, PUSH ──▶ PULL
N:M brokered               REQ ──▶ ROUTER │ broker │ DEALER ──▶ REP
N:M pub-sub proxy           PUB ──▶ XSUB │ proxy │ XPUB ──▶ SUB
Pipeline                   PUSH ──▶ PULL ──▶ PUSH ──▶ PULL
Async client-server        DEALER ──▶ ROUTER
Peer-to-peer               ROUTER ──▶ ROUTER  (hard mode)

Practical advice from the zguide

  1. Start with the simplest pattern that works. REQ/REP is fine until it isn't.
  2. linger defaults to 0 — sockets close instantly. Override with linger: kwarg or sock.linger = only when you need delivery guarantees.
  3. PUB/SUB filtering is prefix-based. "" = everything.
  4. Connect from the ephemeral side. Stable address binds.
  5. Heartbeat everything in production.
  6. Let ØMQ reconnect for you. Close-and-reopen only for REQ after a timeout (Lazy Pirate).
  7. Use ROUTER for addressed routing, DEALER for async round-robin.
  8. Multipart messages are atomic. Use them for envelopes, not streaming.
  9. Set timeouts. recv_timeout saves you from frozen processes.
  10. Use abstract namespace IPC on Linux (ipc://@name) — no leftover socket files.

Working examples

File Pattern Key sockets
01_req_rep.rb Basic echo + multi-worker broker REQ, REP, ROUTER, DEALER
02_pub_sub.rb Topic filter, XPUB/XSUB proxy PUB, SUB, XPUB, XSUB
03_pipeline.rb Fan-out/fan-in ventilator PUSH, PULL
04_lazy_pirate.rb Client timeout + retry REQ, REP
05_heartbeat.rb Liveness detection PUB, SUB
06_last_value_cache.rb Caching proxy + snapshot PUB, SUB, REQ, REP
07_clone.rb Reliable state sync PUB, SUB, REQ, REP
08_majordomo.rb Service-oriented broker ROUTER, DEALER, REQ
09_titanic.rb Disk-based store-and-forward REQ, REP, PUSH, PULL
10_binary_star.rb Active/passive failover REQ, REP, PUB, SUB
11_freelance.rb Brokerless multi-server reliability REQ, REP, DEALER

Run any example: ruby examples/zguide/03_pipeline.rb


Further reading