Skip to content

Commit 91fc02d

Browse files
committed
[ISSUE #51]Consumer should support bulk subscription queue and poll message by offset .
1 parent 0c809cb commit 91fc02d

1 file changed

Lines changed: 73 additions & 3 deletions

File tree

  • openmessaging-api/src/main/java/io/openmessaging/consumer

openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@ public interface Consumer extends ServiceLifecycle, Client {
9797
*/
9898
void bindQueue(String queueName);
9999

100+
/**
101+
* Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get
102+
* messages from a collection of queue.
103+
* <p>
104+
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
105+
* coming.
106+
*
107+
* @param queueNames a collection of queues.
108+
* @throws OMSSecurityException when have no authority to bind to this queue.
109+
* @throws OMSDestinationException when have no given destination in the server.
110+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
111+
*/
112+
void bindQueue(List<String> queueNames);
113+
100114
/**
101115
* Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}.
102116
* <p>
@@ -111,6 +125,20 @@ public interface Consumer extends ServiceLifecycle, Client {
111125
*/
112126
void bindQueue(String queueName, MessageListener listener);
113127

128+
/**
129+
* Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}.
130+
* <p>
131+
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
132+
* coming.
133+
*
134+
* @param queueNames a collection of queues.
135+
* @param listener a specified listener to receive new message.
136+
* @throws OMSSecurityException when have no authority to bind to this queue.
137+
* @throws OMSDestinationException when have no given destination in the server.
138+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
139+
*/
140+
void bindQueues(List<String> queueNames, MessageListener listener);
141+
114142
/**
115143
* Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}.
116144
* <p>
@@ -125,6 +153,20 @@ public interface Consumer extends ServiceLifecycle, Client {
125153
*/
126154
void bindQueue(String queueName, BatchMessageListener listener);
127155

156+
/**
157+
* Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}.
158+
* <p>
159+
* {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered
160+
* messages is coming.
161+
*
162+
* @param queueNames a collection of queues.
163+
* @param listener a specified listener to receive new messages.
164+
* @throws OMSSecurityException when have no authority to bind to this queue.
165+
* @throws OMSDestinationException when have no given destination in the server.
166+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
167+
*/
168+
void bindQueues(List<String> queueNames, BatchMessageListener listener);
169+
128170
/**
129171
* Unbind the {@code Consumer} from a specified queue.
130172
* <p>
@@ -134,6 +176,15 @@ public interface Consumer extends ServiceLifecycle, Client {
134176
*/
135177
void unbindQueue(String queueName);
136178

179+
/**
180+
* Unbind the {@code Consumer} from a collection of queues.
181+
* <p>
182+
* After the success call, this consumer won't receive new message from the specified queue any more.
183+
*
184+
* @param queueNames a collection of queues.
185+
*/
186+
void unbindQueues(List<String> queueNames);
187+
137188
/**
138189
* This method is used to find out whether the {@code Consumer} in bind queue.
139190
*
@@ -142,11 +193,11 @@ public interface Consumer extends ServiceLifecycle, Client {
142193
boolean isBindQueue();
143194

144195
/**
145-
* This method is used to find out the queue bind to {@code Consumer}.
196+
* This method is used to find out the collection of queues bind to {@code Consumer}.
146197
*
147-
* @return the queue this consumer is bind, or null if the consumer is not bind queue.
198+
* @return the queues this consumer is bind, or null if the consumer is not bind queue.
148199
*/
149-
String getBindQueue();
200+
String getBindQueues();
150201

151202
/**
152203
* Adds a {@code ConsumerInterceptor} instance to this consumer.
@@ -176,6 +227,25 @@ public interface Consumer extends ServiceLifecycle, Client {
176227
*/
177228
Message receive(long timeout);
178229

230+
/**
231+
* Receives the next message from the which bind queue,partition and offset of this consumer in pull model.
232+
* <p>
233+
* This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer}
234+
* is shut down.
235+
*
236+
* @param queueName receive message from which queueName in Message Queue.
237+
* @param partition receive message from which partition in Message Queue.
238+
* @param offset receive message from which offset in Message Queue.
239+
* @param timeout receive message will blocked at most <code>timeout</code> milliseconds.
240+
* @return the next message received from the bind queues, or null if the consumer is concurrently shut down.
241+
* @throws OMSSecurityException when have no authority to receive messages from this queue.
242+
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
243+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
244+
*/
245+
Message receive(String queueName, String partition, long offset, long timeout);
246+
247+
248+
179249
/**
180250
* Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
181251
* should implement in the {@link MessageListener}.

0 commit comments

Comments
 (0)