-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path08_majordomo.rb
More file actions
153 lines (128 loc) · 4.42 KB
/
08_majordomo.rb
File metadata and controls
153 lines (128 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env ruby
# frozen_string_literal: true
$VERBOSE = nil
$LOAD_PATH.unshift(File.expand_path('../../lib', __dir__))
require 'minitest/autorun'
require 'minitest/spec'
require 'omq'
require 'async'
require 'console'
Console.logger = Console::Logger.new(Console::Output::Null.new)
# ZGuide Chapter 4 — Majordomo Pattern
# A service-oriented broker. Workers register by service name.
# Clients request a service by name. The broker routes to the right
# worker pool. Demonstrates service discovery and LRU routing.
#
# Protocol (simplified):
# Worker → Broker: ["READY", service_name]
# Client → Broker: [service_name, request_body]
# Broker → Worker: [client_identity, "", request_body]
# Worker → Broker: [client_identity, "", reply_body]
# Broker → Client: [reply_body]
describe 'Majordomo' do
it 'routes requests to workers by service name' do
frontend_ep = 'inproc://zg08_frontend'
backend_ep = 'inproc://zg08_backend'
Async do |task|
# Broker: ROUTER (frontend for clients) + ROUTER (backend for workers)
frontend = OMQ::ROUTER.bind(frontend_ep)
backend = OMQ::ROUTER.bind(backend_ep)
frontend.recv_timeout = 2
backend.recv_timeout = 0.1
# service_name => [worker_identity, ...]
services = Hash.new { |h, k| h[k] = [] }
broker = task.async do
# Register workers
6.times do
msg = backend.receive
worker_id = msg[0]
command = msg[1]
service = msg[2]
if command == 'READY'
services[service] << worker_id
puts " broker: worker #{worker_id.unpack1('H*')[0..7]} registered for '#{service}'"
end
rescue IO::TimeoutError
break
end
# Route client requests to workers, relay replies
6.times do
msg = frontend.receive
client_id = msg[0]
service = msg[2]
body = msg[3]
worker_id = services[service]&.shift
unless worker_id
puts " broker: no worker for service '#{service}'"
next
end
backend << [worker_id, client_id, '', body]
puts " broker: routed '#{service}' request to worker"
reply_msg = backend.receive
_wid = reply_msg[0]
reply_cid = reply_msg[1]
_delim = reply_msg[2]
reply_body = reply_msg[3]
services[service] << _wid
frontend << [reply_cid, '', reply_body]
end
end
# Workers: 2 "echo" workers, 1 "upper" worker
worker_tasks = []
[['echo', 2], ['upper', 1]].each do |service, count|
count.times do |id|
worker_tasks << task.async do
sock = OMQ::DEALER.connect(backend_ep)
sock.recv_timeout = 2
sock << ['READY', service]
loop do
msg = sock.receive
client_id = msg[0]
_delim = msg[1]
body = msg[2]
reply = case service
when 'echo' then "echo:#{body}"
when 'upper' then body.upcase
end
sock << [client_id, '', reply]
puts " worker(#{service}-#{id}): #{body} -> #{reply}"
rescue IO::TimeoutError
break
end
ensure
sock.close
end
end
end
sleep 0.03
requests = [
['echo', 'hello'],
['echo', 'world'],
['upper', 'foo'],
['echo', 'test'],
['upper', 'bar'],
['upper', 'baz'],
]
req = OMQ::REQ.connect(frontend_ep)
req.recv_timeout = 2
replies = requests.map do |service, body|
req << [service, body]
reply = req.receive.first
puts " client: #{service}(#{body}) -> #{reply}"
[service, reply]
end
broker.wait
worker_tasks.each(&:wait)
echo_replies = replies.select { |s, _| s == 'echo' }.map(&:last)
upper_replies = replies.select { |s, _| s == 'upper' }.map(&:last)
assert(echo_replies.all? { |r| r.start_with?('echo:') }, 'echo workers should echo')
assert(upper_replies.all? { |r| r == r.upcase }, 'upper workers should upcase')
assert_equal requests.size, replies.size
puts " summary: #{replies.size} requests routed across 2 services"
ensure
frontend&.close
backend&.close
req&.close
end
end
end