Skip to content

Commit e34afb9

Browse files
committed
And OMS exceptions for java
1 parent 9402d97 commit e34afb9

44 files changed

Lines changed: 329 additions & 805 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import io.openmessaging.MessagingAccessPoint;
44
import io.openmessaging.OMS;
55
import io.openmessaging.common.Result;
6-
import io.openmessaging.consumer.BindResult;
76
import io.openmessaging.consumer.Consumer;
87
import io.openmessaging.consumer.ReceiveResult;
9-
import io.openmessaging.manager.QueueConfig;
108
import io.openmessaging.manager.ResourceManager;
119

1210
public class PullConsumerApp {
@@ -17,15 +15,7 @@ public static void main(String[] args) {
1715

1816
//Fetch a ResourceManager to create Queue resource.
1917
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
20-
Result createQueueResult = resourceManager.createQueue("NS://HELLO_QUEUE", new QueueConfig() {
21-
@Override public void setFifo(boolean isFifo) {
22-
23-
}
24-
25-
@Override public boolean isFifo() {
26-
return false;
27-
}
28-
});
18+
Result createQueueResult = resourceManager.createQueue("NS://HELLO_QUEUE");
2919
if (createQueueResult.isSuccess()) {
3020
//Start a PullConsumer to receive messages from the specific queue.
3121
final Consumer consumer = messagingAccessPoint.createConsumer();
@@ -39,7 +29,7 @@ public void run() {
3929
}
4030
}));
4131

42-
BindResult bindQueueResult = consumer.bindQueue("NS://HELLO_QUEUE");
32+
Result bindQueueResult = consumer.bindQueue("NS://HELLO_QUEUE");
4333
if (bindQueueResult.isSuccess()) {
4434
ReceiveResult receiveResult = consumer.receive(1000);
4535
if (receiveResult.isSuccess()) {

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
import io.openmessaging.common.Result;
2424
import io.openmessaging.consumer.Consumer;
2525
import io.openmessaging.consumer.MessageListener;
26-
import io.openmessaging.exception.OMSResourceNotExistException;
2726
import io.openmessaging.manager.ResourceManager;
2827

2928
public class PushConsumerApp {
30-
public static void main(String[] args) throws OMSResourceNotExistException {
29+
public static void main(String[] args) {
3130
//Load and start the vendor implementation from a specific OMS driver URL.
3231
final MessagingAccessPoint messagingAccessPoint =
3332
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
package io.openmessaging.samples.producer;
1919

20+
import io.openmessaging.Future;
2021
import io.openmessaging.Message;
2122
import io.openmessaging.MessagingAccessPoint;
2223
import io.openmessaging.OMS;
23-
import io.openmessaging.manager.ResourceManager;
24+
import io.openmessaging.interceptor.Context;
25+
import io.openmessaging.interceptor.ProducerInterceptor;
2426
import io.openmessaging.producer.Producer;
2527
import io.openmessaging.producer.SendResult;
2628
import java.nio.charset.Charset;
29+
import java.util.ArrayList;
30+
import java.util.List;
2731

2832
public class ProducerApp {
2933
public static void main(String[] args) {
@@ -32,7 +36,16 @@ public static void main(String[] args) {
3236

3337
final Producer producer = messagingAccessPoint.createProducer();
3438
producer.startup();
35-
final ResourceManager manager = messagingAccessPoint.resourceManager();
39+
ProducerInterceptor interceptor = new ProducerInterceptor() {
40+
@Override
41+
public void preSend(Message message, Context attributes) {
42+
}
43+
44+
@Override
45+
public void postSend(Message message, Context attributes) {
46+
}
47+
};
48+
producer.addInterceptor(interceptor);
3649

3750
//Register a shutdown hook to close the opened endpoints.
3851
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -43,18 +56,33 @@ public void run() {
4356
}));
4457

4558
//Sends a message to the specified destination synchronously.
46-
{
47-
Message message = producer.createMessage(
48-
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
49-
SendResult sendResult = producer.send(message);
59+
Message message = producer.createMessage(
60+
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
61+
SendResult sendResult = producer.send(message);
5062

63+
if (sendResult.isSuccess()) {
64+
System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
65+
} else {
66+
System.out.println("Error: " + sendResult.getError().getErrorCode() + " error message: " + sendResult.getError().getErrorMessage());
67+
}
5168

52-
if (sendResult.isSuccess()) {
53-
System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
54-
} else {
55-
System.out.println("Error: " + sendResult.getError().getErrorCode() + " error message: " + sendResult.getError().getErrorMessage());
56-
}
69+
//Sends a message to the specified destination async.
70+
Future<SendResult> sendResultFuture = producer.sendAsync(message);
71+
sendResult = sendResultFuture.get(1000);
72+
if (sendResult.isSuccess()) {
73+
System.out.println("Send message async: " + sendResult);
5774
}
75+
//Sends a message to the specified destination in one way mode.
76+
producer.sendOneway(message);
5877

78+
//Sends messages to the specified destination in batch mode.
79+
List<Message> messages = new ArrayList<Message>(10);
80+
for (int i = 0; i < 10; i++) {
81+
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
82+
messages.add(msg);
83+
}
84+
producer.send(messages);
85+
producer.removeInterceptor(interceptor);
86+
producer.shutdown();
5987
}
6088
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import io.openmessaging.Message;
2121
import io.openmessaging.MessagingAccessPoint;
2222
import io.openmessaging.OMS;
23-
import io.openmessaging.producer.CheckListener;
2423
import io.openmessaging.producer.Producer;
24+
import io.openmessaging.producer.TransactionStateCheckListener;
2525
import io.openmessaging.producer.TransactionalResult;
2626
import java.nio.charset.Charset;
2727

@@ -30,7 +30,11 @@ public static void main(String[] args) {
3030
final MessagingAccessPoint messagingAccessPoint =
3131
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
3232

33-
final Producer producer = messagingAccessPoint.createProducer();
33+
final Producer producer = messagingAccessPoint.createProducer(new TransactionStateCheckListener() {
34+
@Override public void check(Message message, TransactionalContext context) {
35+
36+
}
37+
});
3438
producer.startup();
3539

3640
//Register a shutdown hook to close the opened endpoints.
@@ -45,14 +49,10 @@ public void run() {
4549
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
4650

4751
//Sends a transaction message to the specified destination synchronously.
48-
producer.register(new CheckListener() {
49-
@Override public void check(Message message, TransactionalContext context) {
50-
context.commit();
51-
}
52-
});
5352
TransactionalResult result = producer.prepare(message);
5453
executeLocalTransaction(result);
5554
result.commit();
55+
producer.shutdown();
5656
System.out.println("Send transaction message OK, message id is: " + result.messageId());
5757
}
5858

openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
import io.openmessaging.OMS;
2323
import io.openmessaging.consumer.Consumer;
2424
import io.openmessaging.consumer.MessageListener;
25-
import io.openmessaging.exception.OMSResourceNotExistException;
2625
import io.openmessaging.manager.ResourceManager;
2726
import io.openmessaging.producer.Producer;
2827

2928
public class RoutingApp {
30-
public static void main(String[] args) throws OMSResourceNotExistException {
29+
public static void main(String[] args) {
3130
//Load and start the vendor implementation from a specific OMS driver URL.
3231
final MessagingAccessPoint messagingAccessPoint =
3332
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package io.openmessaging;
1919

20+
import io.openmessaging.exception.OMSMessageFormatException;
21+
2022
/**
2123
* A factory interface for creating {@code Message} objects.
2224
*
@@ -31,6 +33,8 @@ public interface MessageFactory {
3133
* @param queueName the target queue to send
3234
* @param body the body data for a message
3335
* @return the created {@code Message} object
36+
* @throws OMSMessageFormatException when body exceed the maximum length or others.
37+
*
3438
*/
3539
Message createMessage(String queueName, byte[] body);
3640
}

openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919

2020
import io.openmessaging.common.Result;
2121
import io.openmessaging.consumer.Consumer;
22-
import io.openmessaging.consumer.ConsumerConfig;
2322
import io.openmessaging.consumer.MessageListener;
24-
import io.openmessaging.consumer.StreamingConsumer;
2523
import io.openmessaging.exception.OMSRuntimeException;
24+
import io.openmessaging.exception.OMSSecurityException;
2625
import io.openmessaging.manager.ResourceManager;
2726
import io.openmessaging.producer.Producer;
28-
import io.openmessaging.producer.ProducerConfig;
27+
import io.openmessaging.producer.TransactionStateCheckListener;
2928

3029
/**
3130
* An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code
@@ -75,18 +74,21 @@ public interface MessagingAccessPoint extends Result {
7574
* @return the created {@code Producer}
7675
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
7776
* error
77+
* @throws OMSSecurityException if have no authority to create a producer.
7878
*/
7979
Producer createProducer();
8080

8181
/**
82-
* Creates a new {@code Producer} for the specified {@code MessagingAccessPoint} with some preset attributes.
82+
* Creates a new transactional {@code Producer} for the specified {@code MessagingAccessPoint}, the producer is able
83+
* to respond to requests from the server to check the status of the transaction.
8384
*
84-
* @param producerConfig the preset producer config
85+
* @param transactionStateCheckListener transactional check listener {@link TransactionStateCheckListener}
8586
* @return the created {@code Producer}
8687
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
8788
* error
89+
* @throws OMSSecurityException if have no authority to create a producer.
8890
*/
89-
Producer createProducer(ProducerConfig producerConfig);
91+
Producer createProducer(TransactionStateCheckListener transactionStateCheckListener);
9092

9193
/**
9294
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
@@ -95,46 +97,17 @@ public interface MessagingAccessPoint extends Result {
9597
* @return the created {@code PushConsumer}
9698
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
9799
* error
100+
* @throws OMSSecurityException if have no authority to create a consumer.
98101
*/
99102
Consumer createConsumer();
100103

101-
/**
102-
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
103-
* isn't bind to any queue, uses {@link Consumer#bindQueue(String, MessageListener)} to bind queues.
104-
*
105-
* @param consumerConfig the preset config
106-
* @return the created {@code PushConsumer}
107-
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
108-
* error
109-
*/
110-
Consumer createConsumer(ConsumerConfig consumerConfig);
111-
112-
/**
113-
* Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint}.
114-
*
115-
* @return the created {@code Stream}
116-
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
117-
* error
118-
*/
119-
StreamingConsumer createStreamingConsumer();
120-
121-
/**
122-
* Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint} with some preset
123-
* attributes.
124-
*
125-
* @param consumerConfig the preset client config
126-
* @return the created consumer
127-
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
128-
* error
129-
*/
130-
StreamingConsumer createStreamingConsumer(ConsumerConfig consumerConfig);
131-
132104
/**
133105
* Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}.
134106
*
135107
* @return the resource manger
136108
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
137109
* error
110+
* @throws OMSSecurityException if have no authority to obtain a resource manager.
138111
*/
139112
ResourceManager resourceManager();
140113
}

openmessaging-api/src/main/java/io/openmessaging/OMSBuiltinKeys.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,6 @@ public interface OMSBuiltinKeys {
4646
*/
4747
String REGION = "REGION";
4848

49-
/**
50-
* The {@code PRODUCER_ID} key represents the the unique producer id of a producer instance.
51-
*/
52-
String PRODUCER_ID = "PRODUCER_ID";
53-
54-
/**
55-
* The {@code CONSUMER_ID} key represents the the unique consumer id of a consumer instance.
56-
*/
57-
String CONSUMER_ID = "CONSUMER_ID";
58-
5949
/**
6050
* The {@code OPERATION_TIMEOUT} key defines the timeout of almost all the method calls in OMS.
6151
*/

openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,6 @@ public interface ServiceLifecycle {
4949
* Used for get service current state, for execution of some operations is dependent on the current service state.
5050
* @return This service current state {@link ServiceLifeState}
5151
*/
52-
ServiceLifeState getServiceLifeState();
52+
ServiceLifeState currentState();
5353

5454
}

0 commit comments

Comments
 (0)