Skip to content

Commit c1117be

Browse files
authored
Merge pull request #57 from duhenglucky/PubSubModel
Change queue model to pub-sub model
2 parents 9987e91 + c1fbb64 commit c1117be

83 files changed

Lines changed: 1989 additions & 2823 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

openmessaging-admin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<parent>
33
<groupId>io.openmessaging</groupId>
44
<artifactId>parent</artifactId>
5-
<version>1.0.0-beta-SNAPSHOT</version>
5+
<version>1.2.0-SNAPSHOT</version>
66
</parent>
77

88
<modelVersion>4.0.0</modelVersion>

openmessaging-api-samples/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
<parent>
33
<groupId>io.openmessaging</groupId>
44
<artifactId>parent</artifactId>
5-
<version>1.0.0-beta-SNAPSHOT</version>
5+
<version>1.2.0-SNAPSHOT</version>
66
</parent>
77

88
<modelVersion>4.0.0</modelVersion>
99
<packaging>jar</packaging>
1010
<artifactId>openmessaging-api-samples</artifactId>
11-
<version>1.0.0-beta-SNAPSHOT</version>
11+
<version>1.2.0-SNAPSHOT</version>
1212
<name>openmessaging-api-samples ${project.version}</name>
1313

1414
<dependencies>
@@ -21,7 +21,7 @@
2121
<dependency>
2222
<groupId>${project.groupId}</groupId>
2323
<artifactId>openmessaging-api</artifactId>
24-
<version>1.0.0-beta-SNAPSHOT</version>
24+
<version>1.2.0-SNAPSHOT</version>
2525
</dependency>
2626
<dependency>
2727
<groupId>org.slf4j</groupId>

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,42 @@
1717

1818
package io.openmessaging.samples.consumer;
1919

20-
import io.openmessaging.MessagingAccessPoint;
21-
import io.openmessaging.OMS;
22-
import io.openmessaging.consumer.PullConsumer;
23-
import io.openmessaging.message.Message;
24-
import java.util.Arrays;
20+
import io.openmessaging.api.Message;
21+
import io.openmessaging.api.MessagingAccessPoint;
22+
import io.openmessaging.api.OMS;
23+
import io.openmessaging.api.PullConsumer;
24+
import io.openmessaging.api.TopicPartition;
25+
import java.time.Duration;
26+
import java.util.List;
27+
import java.util.Properties;
28+
import java.util.Set;
2529

2630
public class PullConsumerApp {
2731
public static void main(String[] args) {
2832
//Load and start the vendor implementation from a specific OMS driver URL.
2933
final MessagingAccessPoint messagingAccessPoint =
3034
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
31-
35+
Properties properties = new Properties();
3236
//Start a PullConsumer to receive messages from the specific queue.
33-
final PullConsumer consumer = messagingAccessPoint.createPullConsumer();
37+
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties);
3438

3539
//Register a shutdown hook to close the opened endpoints.
3640
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
3741
@Override
3842
public void run() {
39-
consumer.stop();
43+
consumer.shutdown();
4044
}
4145
}));
4246

43-
consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE"));
47+
Set<TopicPartition> topicPartitions = consumer.topicPartitions("NS://TOPIC");
48+
consumer.assign(topicPartitions);
4449
consumer.start();
4550

46-
Message message = consumer.receive(1000);
51+
List<Message> message = consumer.poll(Duration.ofMillis(1000));
4752
System.out.println("Received message: " + message);
4853
//Acknowledge the consumed message
49-
consumer.ack(message.getMessageReceipt());
50-
consumer.stop();
54+
consumer.commitSync();
55+
consumer.shutdown();
5156

5257
}
5358
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,45 @@
1717

1818
package io.openmessaging.samples.consumer;
1919

20-
import io.openmessaging.MessagingAccessPoint;
21-
import io.openmessaging.OMS;
22-
import io.openmessaging.consumer.Consumer;
23-
import io.openmessaging.consumer.MessageListener;
24-
import io.openmessaging.consumer.PushConsumer;
25-
import io.openmessaging.manager.ResourceManager;
26-
import io.openmessaging.message.Message;
27-
import java.util.Arrays;
20+
import io.openmessaging.api.Action;
21+
import io.openmessaging.api.ConsumeContext;
22+
import io.openmessaging.api.Consumer;
23+
import io.openmessaging.api.Message;
24+
import io.openmessaging.api.MessageListener;
25+
import io.openmessaging.api.MessagingAccessPoint;
26+
import io.openmessaging.api.OMS;
27+
import java.util.Properties;
2828

2929
public class PushConsumerApp {
3030
public static void main(String[] args) {
3131
//Load and start the vendor implementation from a specific OMS driver URL.
3232
final MessagingAccessPoint messagingAccessPoint =
33-
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
33+
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876");
3434

35-
//Fetch a ResourceManager to create Queue resource.
36-
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
37-
resourceManager.createNamespace("NS://XXXX");
38-
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
35+
Properties properties = new Properties();
36+
final Consumer consumer = messagingAccessPoint.createConsumer(properties);
3937
consumer.start();
4038

4139
//Register a shutdown hook to close the opened endpoints.
4240
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4341
@Override
4442
public void run() {
45-
consumer.stop();
43+
consumer.shutdown();
4644
}
4745
}));
4846

4947
//Consume messages from a simple queue.
50-
String simpleQueue = "NS://HELLO_QUEUE";
51-
resourceManager.createQueue(simpleQueue);
52-
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
53-
//be consumed by this consumer.
54-
consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() {
48+
String topic = "NS://HELLO_TOPIC";
49+
50+
consumer.subscribe(topic, "*", new MessageListener(){
5551
@Override
56-
public void onReceived(Message message, Context context) {
57-
System.out.println("Received one message: " + message);
58-
context.ack();
59-
}
52+
public Action consume(Message message, ConsumeContext context) {
6053

54+
return Action.CommitMessage;
55+
}
6156
});
6257

63-
consumer.unbindQueue(Arrays.asList(simpleQueue));
6458

65-
consumer.stop();
59+
consumer.shutdown();
6660
}
6761
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,71 +17,52 @@
1717

1818
package io.openmessaging.samples.producer;
1919

20-
import io.openmessaging.Future;
21-
import io.openmessaging.MessagingAccessPoint;
22-
import io.openmessaging.OMS;
23-
import io.openmessaging.interceptor.Context;
24-
import io.openmessaging.interceptor.ProducerInterceptor;
25-
import io.openmessaging.message.Message;
26-
import io.openmessaging.producer.Producer;
27-
import io.openmessaging.producer.SendResult;
28-
import java.nio.charset.Charset;
29-
import java.util.ArrayList;
30-
import java.util.List;
20+
import io.openmessaging.api.Message;
21+
import io.openmessaging.api.MessagingAccessPoint;
22+
import io.openmessaging.api.OMS;
23+
import io.openmessaging.api.OnExceptionContext;
24+
import io.openmessaging.api.Producer;
25+
import io.openmessaging.api.SendCallback;
26+
import io.openmessaging.api.SendResult;
27+
import java.util.Properties;
3128

3229
public class ProducerApp {
3330
public static void main(String[] args) {
3431
final MessagingAccessPoint messagingAccessPoint =
35-
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
32+
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org");
3633

37-
final Producer producer = messagingAccessPoint.createProducer();
38-
ProducerInterceptor interceptor = new ProducerInterceptor() {
39-
@Override
40-
public void preSend(Message message, Context attributes) {
41-
System.out.println("PreSend message: " + message);
42-
}
43-
44-
@Override
45-
public void postSend(Message message, Context attributes) {
46-
System.out.println("PostSend message: " + message);
47-
}
48-
};
49-
producer.addInterceptor(interceptor);
34+
final Producer producer = messagingAccessPoint.createProducer(new Properties());
5035
producer.start();
5136

5237
//Register a shutdown hook to close the opened endpoints.
5338
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
5439
@Override
5540
public void run() {
56-
producer.stop();
41+
producer.shutdown();
5742
}
5843
}));
5944

60-
//Send a message to the specified destination synchronously.
61-
Message message = producer.createMessage(
62-
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
63-
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
64-
message.extensionHeader().setPartition(1);
45+
Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());
46+
6547
SendResult sendResult = producer.send(message);
6648
System.out.println("SendResult: " + sendResult);
6749

6850
//Sends a message to the specified destination async.
69-
Future<SendResult> sendResultFuture = producer.sendAsync(message);
70-
sendResult = sendResultFuture.get(1000);
71-
System.out.println("SendResult: " + sendResult);
51+
producer.sendAsync(message, new SendCallback() {
52+
@Override
53+
public void onSuccess(SendResult sendResult) {
54+
System.out.println("SendResult: " + sendResult);
55+
}
56+
57+
@Override
58+
public void onException(OnExceptionContext context) {
59+
60+
}
61+
});
7262

7363
//Sends a message to the specified destination in one way mode.
7464
producer.sendOneway(message);
7565

76-
//Sends messages to the specified destination in batch mode.
77-
List<Message> messages = new ArrayList<Message>(10);
78-
for (int i = 0; i < 10; i++) {
79-
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
80-
messages.add(msg);
81-
}
82-
83-
producer.send(messages);
84-
producer.removeInterceptor(interceptor);
85-
producer.stop();
66+
producer.shutdown();
8667
}
8768
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@
1717

1818
package io.openmessaging.samples.producer;
1919

20-
import io.openmessaging.message.Message;
21-
import io.openmessaging.MessagingAccessPoint;
22-
import io.openmessaging.OMS;
23-
import io.openmessaging.producer.Producer;
24-
import io.openmessaging.producer.TransactionStateCheckListener;
25-
import io.openmessaging.producer.TransactionalResult;
26-
import java.nio.charset.Charset;
20+
import io.openmessaging.api.Message;
21+
import io.openmessaging.api.MessagingAccessPoint;
22+
import io.openmessaging.api.OMS;
23+
import io.openmessaging.api.SendResult;
24+
import io.openmessaging.api.transaction.LocalTransactionChecker;
25+
import io.openmessaging.api.transaction.LocalTransactionExecuter;
26+
import io.openmessaging.api.transaction.TransactionProducer;
27+
import io.openmessaging.api.transaction.TransactionStatus;
28+
import java.util.Properties;
2729

2830
public class TransactionProducerApp {
2931
public static void main(String[] args) {
3032
final MessagingAccessPoint messagingAccessPoint =
3133
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
3234

33-
final Producer producer = messagingAccessPoint.createProducer(new TransactionStateCheckListener() {
34-
@Override public void check(Message message, TransactionalContext context) {
35-
35+
final TransactionProducer producer = messagingAccessPoint.createTransactionProducer(new Properties(), new LocalTransactionChecker() {
36+
@Override
37+
public TransactionStatus check(Message msg) {
38+
return TransactionStatus.CommitTransaction;
3639
}
3740
});
3841
producer.start();
@@ -41,23 +44,19 @@ public static void main(String[] args) {
4144
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4245
@Override
4346
public void run() {
44-
producer.stop();
47+
producer.shutdown();
4548
}
4649
}));
4750

48-
Message message = producer.createMessage(
49-
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
51+
Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());
5052

5153
//Sends a transaction message to the specified destination synchronously.
52-
TransactionalResult result = producer.prepare(message);
53-
executeLocalTransaction(result);
54-
result.commit();
55-
producer.stop();
56-
System.out.println("Send transaction message OK, message id is: " + result.messageId());
54+
SendResult result = producer.send(message, new LocalTransactionExecuter() {
55+
@Override public TransactionStatus execute(Message message, Object arg) {
56+
return TransactionStatus.CommitTransaction;
57+
}
58+
}, null);
59+
System.out.println("Send transaction message OK, message id is: " + result.getMessageId());
5760
}
5861

59-
private static void executeLocalTransaction(TransactionalResult result) {
60-
System.out.println("transactionId: " + result.transactionId());
61-
System.out.println("execute local transaction");
62-
}
6362
}

0 commit comments

Comments
 (0)