Skip to content

Commit 92fb6a4

Browse files
committed
Polish exception and errorcode implementation
1 parent 1d0c274 commit 92fb6a4

21 files changed

Lines changed: 247 additions & 323 deletions

File tree

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package io.openmessaging.samples.consumer;
22

3+
import io.openmessaging.Message;
34
import io.openmessaging.MessagingAccessPoint;
45
import io.openmessaging.OMS;
5-
import io.openmessaging.common.Result;
66
import io.openmessaging.consumer.Consumer;
7-
import io.openmessaging.consumer.ReceiveResult;
87
import io.openmessaging.manager.ResourceManager;
98

109
public class PullConsumerApp {
@@ -15,33 +14,23 @@ public static void main(String[] args) {
1514

1615
//Fetch a ResourceManager to create Queue resource.
1716
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
18-
Result createQueueResult = resourceManager.createQueue("NS://HELLO_QUEUE");
19-
if (createQueueResult.isSuccess()) {
20-
//Start a PullConsumer to receive messages from the specific queue.
21-
final Consumer consumer = messagingAccessPoint.createConsumer();
22-
consumer.startup();
17+
resourceManager.createQueue("NS://HELLO_QUEUE");
2318

24-
//Register a shutdown hook to close the opened endpoints.
25-
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
26-
@Override
27-
public void run() {
28-
consumer.shutdown();
29-
}
30-
}));
19+
//Start a PullConsumer to receive messages from the specific queue.
20+
final Consumer consumer = messagingAccessPoint.createConsumer();
21+
consumer.start();
3122

32-
Result bindQueueResult = consumer.bindQueue("NS://HELLO_QUEUE");
33-
if (bindQueueResult.isSuccess()) {
34-
ReceiveResult receiveResult = consumer.receive(1000);
35-
if (receiveResult.isSuccess()) {
36-
System.out.println("Received message: " + receiveResult.message());
37-
//Acknowledge the consumed message
38-
consumer.ack(receiveResult.message().headers().getMessageId());
39-
}
40-
} else {
41-
System.out.println("Error: " + bindQueueResult.getError().getErrorCode() + " error message: " + bindQueueResult.getError().getErrorMessage());
23+
//Register a shutdown hook to close the opened endpoints.
24+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
25+
@Override
26+
public void run() {
27+
consumer.stop();
4228
}
43-
} else {
44-
System.out.println("Error: " + createQueueResult.getError().getErrorCode() + " error message: " + createQueueResult.getError().getErrorMessage());
45-
}
29+
}));
30+
consumer.bindQueue("NS://HELLO_QUEUE");
31+
Message message = consumer.receive(1000);
32+
System.out.println("Received message: " + message);
33+
//Acknowledge the consumed message
34+
consumer.ack(message.headers().getMessageId());
4635
}
4736
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.openmessaging.Message;
2121
import io.openmessaging.MessagingAccessPoint;
2222
import io.openmessaging.OMS;
23-
import io.openmessaging.common.Result;
2423
import io.openmessaging.consumer.Consumer;
2524
import io.openmessaging.consumer.MessageListener;
2625
import io.openmessaging.manager.ResourceManager;
@@ -35,36 +34,32 @@ public static void main(String[] args) {
3534
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
3635
resourceManager.createNamespace("NS://XXXX");
3736
final Consumer consumer = messagingAccessPoint.createConsumer();
38-
consumer.startup();
37+
consumer.start();
3938

4039
//Register a shutdown hook to close the opened endpoints.
4140
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4241
@Override
4342
public void run() {
44-
consumer.shutdown();
43+
consumer.stop();
4544
}
4645
}));
4746

4847
//Consume messages from a simple queue.
4948
String simpleQueue = "NS://HELLO_QUEUE";
50-
Result result = resourceManager.createQueue(simpleQueue);
49+
resourceManager.createQueue(simpleQueue);
5150
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
5251
//be consumed by this consumer.
53-
if (result.isSuccess()){
54-
consumer.bindQueue(simpleQueue, new MessageListener() {
55-
@Override
56-
public void onReceived(Message message, Context context) {
57-
System.out.println("Received one message: " + message);
58-
context.ack();
59-
}
52+
consumer.bindQueue(simpleQueue, new MessageListener() {
53+
@Override
54+
public void onReceived(Message message, Context context) {
55+
System.out.println("Received one message: " + message);
56+
context.ack();
57+
}
6058

61-
});
62-
}else {
63-
System.out.println("error: " + result.getError().getErrorCode() + " error message: " + result );
64-
}
59+
});
6560

6661
consumer.unbindQueue(simpleQueue);
6762

68-
consumer.shutdown();
63+
consumer.stop();
6964
}
7065
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static void main(String[] args) {
3535
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
3636

3737
final Producer producer = messagingAccessPoint.createProducer();
38-
producer.startup();
38+
producer.start();
3939
ProducerInterceptor interceptor = new ProducerInterceptor() {
4040
@Override
4141
public void preSend(Message message, Context attributes) {
@@ -51,27 +51,21 @@ public void postSend(Message message, Context attributes) {
5151
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
5252
@Override
5353
public void run() {
54-
producer.shutdown();
54+
producer.stop();
5555
}
5656
}));
5757

5858
//Sends a message to the specified destination synchronously.
5959
Message message = producer.createMessage(
6060
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
6161
SendResult sendResult = producer.send(message);
62-
63-
if (sendResult.isSuccess()) {
64-
System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
65-
} else {
66-
System.out.println("Error: " + sendResult.getError().getErrorCode() + " error message: " + sendResult.getError().getErrorMessage());
67-
}
62+
System.out.println("SendResult: " + sendResult);
6863

6964
//Sends a message to the specified destination async.
7065
Future<SendResult> sendResultFuture = producer.sendAsync(message);
7166
sendResult = sendResultFuture.get(1000);
72-
if (sendResult.isSuccess()) {
73-
System.out.println("Send message async: " + sendResult);
74-
}
67+
System.out.println("SendResult: " + sendResult);
68+
7569
//Sends a message to the specified destination in one way mode.
7670
producer.sendOneway(message);
7771

@@ -83,6 +77,6 @@ public void run() {
8377
}
8478
producer.send(messages);
8579
producer.removeInterceptor(interceptor);
86-
producer.shutdown();
80+
producer.stop();
8781
}
8882
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ public static void main(String[] args) {
3535

3636
}
3737
});
38-
producer.startup();
38+
producer.start();
3939

4040
//Register a shutdown hook to close the opened endpoints.
4141
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4242
@Override
4343
public void run() {
44-
producer.shutdown();
44+
producer.stop();
4545
}
4646
}));
4747

@@ -52,7 +52,7 @@ public void run() {
5252
TransactionalResult result = producer.prepare(message);
5353
executeLocalTransaction(result);
5454
result.commit();
55-
producer.shutdown();
55+
producer.stop();
5656
System.out.println("Send transaction message OK, message id is: " + result.messageId());
5757
}
5858

openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.openmessaging.producer.Producer;
2727

2828
public class RoutingApp {
29-
public static void main(String[] args) {
29+
public static void main(String[] args) {
3030
//Load and start the vendor implementation from a specific OMS driver URL.
3131
final MessagingAccessPoint messagingAccessPoint =
3232
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
@@ -42,11 +42,11 @@ public static void main(String[] args) {
4242
resourceManager.createQueue(sourceQueue);
4343

4444
resourceManager.routing(sourceQueue, destinationQueue);
45-
resourceManager.filter(destinationQueue,"name = 'kaka'");
45+
resourceManager.filter(destinationQueue, "name = 'kaka'");
4646

4747
//Send messages to the source queue ahead of the routing
4848
final Producer producer = messagingAccessPoint.createProducer();
49-
producer.startup();
49+
producer.start();
5050

5151
Message message = producer.createMessage(sourceQueue, "RED_COLOR".getBytes());
5252
message.properties().put("color", "green").put("shape", "round");
@@ -55,7 +55,7 @@ public static void main(String[] args) {
5555

5656
//Consume messages from the queue behind the routing.
5757
final Consumer consumer = messagingAccessPoint.createConsumer();
58-
consumer.startup();
58+
consumer.start();
5959

6060
consumer.bindQueue(destinationQueue, new MessageListener() {
6161
@Override
@@ -72,9 +72,10 @@ public void onReceived(Message message, Context context) {
7272
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
7373
@Override
7474
public void run() {
75-
producer.shutdown();
76-
consumer.shutdown();
75+
producer.stop();
76+
consumer.stop();
7777
}
7878
}));
79+
7980
}
8081
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package io.openmessaging;
2+
3+
import io.openmessaging.exception.OMSRuntimeException;
4+
5+
/**
6+
* This class defined error code existed in OpenMessaging:
7+
* <p>
8+
* 110x: The request was received, continuing process
9+
* </p>
10+
* <p>
11+
* 120x: The request was successfully received, understood, and accepted
12+
* </p>
13+
* <p>
14+
* 130x: Further action needs to be taken in order to complete the request
15+
* </p>
16+
* <p>
17+
* 140x: The request contains bad syntax or cannot be fulfilled
18+
* </p>
19+
* <p>
20+
* 150x: The server failed to fulfill an apparently valid request
21+
* </p>
22+
*/
23+
public enum Error {
24+
ERROR_1101(1101, "Unsupported Version"),
25+
26+
ERROR_1200(1200, "Success"),
27+
28+
ERROR_1400(1400, "Bad Request"),
29+
30+
ERROR_1401(1401, "Unauthorized"),
31+
32+
ERROR_1402(1402, "Message body Required"),
33+
34+
ERROR_1403(1403, "Forbidden"),
35+
36+
ERROR_1404(1404, "Destination Not Found"),
37+
38+
ERROR_1405(1405, "Namespace Not Fount"),
39+
40+
ERROR_1406(1406, "Destination Already Exists"),
41+
42+
ERROR_1407(1407, "Namespace Already Exists"),
43+
44+
ERROR_1408(1408, "ConsumerId Already Exists"),
45+
46+
ERROR_1409(1409, "ProducerId Already Exists"),
47+
48+
ERROR_1410(1410, "Request Timeout"),
49+
50+
ERROR_1411(1411, "Message Attributes Too Large"),
51+
52+
ERROR_1412(1412, "Message Header Too Large"),
53+
54+
ERROR_1413(1413, "Message Body Too Large"),
55+
56+
ERROR_1414(1414, "No New Message Found"),
57+
58+
ERROR_1415(1415, "Max Topics Reached"),
59+
60+
ERROR_1416(1416, "Max Queues Reached"),
61+
62+
ERROR_1417(1417, "Max Namespaces Reached"),
63+
64+
ERROR_1418(1418, "Bad Parameter"),
65+
66+
ERROR_1500(1500, "Server ERROR"),
67+
68+
ERROR_1501(1501, "Storage Service ERROR"),
69+
70+
ERROR_1502(1502, "Storage Service Busy"),
71+
72+
ERROR_1503(1503, "Service Not Available"),
73+
74+
ERROR_1504(1504, "Flush Disk Timeout"),
75+
76+
ERROR_10000(10000, "Can't construct a MessagingAccessPoint instance from the given OMS driver URL [%s]."),
77+
78+
ERROR_10001(10001, "The OMS driver URL [%s] is illegal."),
79+
80+
ERROR_10002(10002, "The implementation version [%s] is illegal."),
81+
82+
ERROR_10003(10003, "The implementation version [%s] isn't compatible with the specification version [%s].");
83+
84+
private int errorCode;
85+
86+
private String errorMessage;
87+
88+
private String detailErrorMessage;
89+
90+
private static final String refBase = "http://openmessaging.cloud/internal/code";
91+
92+
Error(int errorCode, String errorMessage) {
93+
this.errorCode = errorCode;
94+
95+
this.errorMessage = errorMessage;
96+
97+
this.detailErrorMessage = generateDetailErrorMessage(errorCode, errorMessage);
98+
}
99+
100+
public int getErrorCode() {
101+
return errorCode;
102+
}
103+
104+
public String getErrorMessage() {
105+
return errorMessage;
106+
}
107+
108+
public String getDetailErrorMessage() {
109+
return detailErrorMessage;
110+
}
111+
112+
public static OMSRuntimeException generateException(Error error, String... messageArgs) {
113+
return new OMSRuntimeException(error.getErrorCode(), String.format(error.getDetailErrorMessage(), (Object[]) messageArgs));
114+
}
115+
116+
public static OMSRuntimeException generateException(Error error) {
117+
return new OMSRuntimeException(error.getErrorCode(), error.getDetailErrorMessage());
118+
}
119+
120+
public static OMSRuntimeException generateException(int errorCode, String errorMessage) {
121+
return new OMSRuntimeException(errorCode, errorMessage);
122+
}
123+
124+
public static String generateDetailErrorMessage(int errorCode, String errorMessage) {
125+
return errorMessage + "\nFor more information, please visit the URL, " + refBase + "#" + errorCode;
126+
}
127+
}

openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,20 @@
2222
/**
2323
* A factory interface for creating {@code Message} objects.
2424
*
25+
* @version OMS 1.0.0
26+
* @since OMS 1.0.0
2527
*/
2628
public interface MessageFactory {
2729
/**
28-
* Creates a {@code Message} object. A {@code Message} object is used to send a message containing a
29-
* stream of uninterpreted bytes.
30+
* Creates a {@code Message} object. A {@code Message} object is used to send a message containing a stream of
31+
* uninterpreted bytes.
3032
* <p>
3133
* The returned {@code Message} object only can be sent to the specified queue.
3234
*
3335
* @param queueName the target queue to send
3436
* @param body the body data for a message
3537
* @return the created {@code Message} object
3638
* @throws OMSMessageFormatException when body exceed the maximum length or others.
37-
*
3839
*/
3940
Message createMessage(String queueName, byte[] body);
4041
}

0 commit comments

Comments
 (0)