Skip to content

Commit 54f8dcf

Browse files
authored
Merge pull request #74 from dbl-x/feat_support_schema
support schema in consumer
2 parents 7d13105 + 0be9d4d commit 54f8dcf

10 files changed

Lines changed: 136 additions & 29 deletions

File tree

openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPushConsumerApp.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public void run() {
5757
String topic = "NS://HELLO_TOPIC";
5858

5959
consumer.subscribe(topic, "*", new GenericMessageListener<MessageSample>() {
60+
61+
@Override
62+
public Class<MessageSample> payloadClass() {
63+
return MessageSample.class;
64+
}
65+
6066
@Override
6167
public Action consume(GenericMessage<MessageSample> message, MessageConsumeContext context) {
6268
MessageSample messageSample = message.getValue();

openmessaging-api/src/main/java/io/openmessaging/api/Consumer.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* @version OMS 1.2.0
2323
* @since OMS 1.2.0
2424
*/
25-
public interface Consumer extends Admin {
25+
public interface Consumer extends ConsumerBase, Admin {
2626

2727
/**
2828
* Subscribe message in order.
@@ -39,7 +39,7 @@ public interface Consumer extends Admin {
3939
/**
4040
* Subscribe to messages, which can be filtered using SQL expressions.
4141
*
42-
* @param topic
42+
* @param topic message topic
4343
* @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
4444
* according to the expression in this selector. Currently supports two expression syntax: {@link
4545
* ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
@@ -58,24 +58,17 @@ public interface Consumer extends Admin {
5858
* @param listener The message callback listener, the consumer receives the message and then passes it to the
5959
* message callback listener for consumption.
6060
*/
61-
void subscribe(final String topic, final String subExpression, final GenericMessageListener listener);
61+
<T> void subscribe(final String topic, final String subExpression, final GenericMessageListener<T> listener);
6262

6363
/**
6464
* Subscribe to messages, which can be filtered using SQL expressions.
6565
*
66-
* @param topic
66+
* @param topic message topic
6767
* @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
6868
* according to the expression in this selector. Currently supports two expression syntax: {@link
6969
* ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
7070
* above interface.
7171
* @param listener Message callback listener
7272
*/
73-
void subscribe(final String topic, final MessageSelector selector, final GenericMessageListener listener);
74-
75-
/**
76-
* Unsubscribe message
77-
*
78-
* @param topic
79-
*/
80-
void unsubscribe(final String topic);
73+
<T> void subscribe(final String topic, final MessageSelector selector, final GenericMessageListener<T> listener);
8174
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.api;
18+
19+
/**
20+
* {@code ConsumerBase} is the basic of other {@code Consumer} interfaces
21+
* and is used to define some common methods.
22+
*
23+
* @version OMS 2.0.3
24+
* @since OMS 2.0.3
25+
*/
26+
public interface ConsumerBase {
27+
/**
28+
* Unsubscribe message of the specified topic
29+
*
30+
* @param topic message topic
31+
*/
32+
void unsubscribe(final String topic);
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.api;
18+
19+
/**
20+
* Base listener for other generic message listener.
21+
*
22+
* @version OMS 2.0.0
23+
* @since OMS 2.0.0
24+
*/
25+
public interface GenericListener<T> {
26+
27+
/**
28+
* Returns the class of the object that the user wants to deserialize the message into.
29+
*
30+
* @return the class of message payload that user expects
31+
*/
32+
Class<T> payloadClass();
33+
}

openmessaging-api/src/main/java/io/openmessaging/api/GenericMessageListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* @version OMS 2.0.0
2929
* @since OMS 2.0.0
3030
*/
31-
public interface GenericMessageListener<T> {
31+
public interface GenericMessageListener<T> extends GenericListener<T> {
3232

3333
/**
3434
* Consumer message interface, implemented by the application, unstable situations such as network jitter may lead

openmessaging-api/src/main/java/io/openmessaging/api/batch/BatchConsumer.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
package io.openmessaging.api.batch;
1919

2020
import io.openmessaging.api.Admin;
21+
import io.openmessaging.api.ConsumerBase;
2122

2223
/**
2324
* Batch message consumer, used to subscribe to messages in batch.
2425
*
2526
* @version OMS 1.2.0
2627
* @since OMS 1.2.0
2728
*/
28-
public interface BatchConsumer extends Admin {
29+
public interface BatchConsumer extends ConsumerBase, Admin {
2930

3031
/**
3132
* Subscribe message
@@ -47,12 +48,5 @@ public interface BatchConsumer extends Admin {
4748
* messages.
4849
* @param listener consume message callback listener.
4950
*/
50-
void subscribe(final String topic, final String subExpression, final GenericBatchMessageListener listener);
51-
52-
/**
53-
* Unsubscribe topic
54-
*
55-
* @param topic
56-
*/
57-
void unsubscribe(final String topic);
51+
<T> void subscribe(final String topic, final String subExpression, final GenericBatchMessageListener<T> listener);
5852
}

openmessaging-api/src/main/java/io/openmessaging/api/batch/GenericBatchMessageListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.openmessaging.api.batch;
1818

1919
import io.openmessaging.api.Action;
20+
import io.openmessaging.api.GenericListener;
2021
import io.openmessaging.api.GenericMessage;
2122
import io.openmessaging.api.MessageConsumeContext;
2223
import java.util.List;
@@ -27,7 +28,7 @@
2728
* @version OMS 2.0.0
2829
* @since OMS 2.0.0
2930
*/
30-
public interface GenericBatchMessageListener<T> {
31+
public interface GenericBatchMessageListener<T> extends GenericListener<T> {
3132
/**
3233
* When message arrived, this method will be invoked by order.
3334
*

openmessaging-api/src/main/java/io/openmessaging/api/order/GenericMessageOrderListener.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package io.openmessaging.api.order;
1818

19+
import io.openmessaging.api.GenericListener;
1920
import io.openmessaging.api.GenericMessage;
2021
import io.openmessaging.api.MessageConsumeContext;
2122

@@ -26,10 +27,10 @@
2627
* @version OMS 2.0.0
2728
* @since OMS 2.0.0
2829
*/
29-
public interface GenericMessageOrderListener<T> {
30+
public interface GenericMessageOrderListener<T> extends GenericListener<T> {
3031

3132
/**
32-
* When message arrived, this method will be invoked by order.
33+
* When message arrived, this method will be invoked in order.
3334
*
3435
* @param message received message
3536
* @param context

openmessaging-api/src/main/java/io/openmessaging/api/order/OrderConsumer.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package io.openmessaging.api.order;
1919

2020
import io.openmessaging.api.Admin;
21+
import io.openmessaging.api.ConsumerBase;
2122
import io.openmessaging.api.ExpressionType;
2223
import io.openmessaging.api.MessageSelector;
2324

@@ -27,7 +28,7 @@
2728
* @version OMS 1.2.0
2829
* @since OMS 1.2.0
2930
*/
30-
public interface OrderConsumer extends Admin {
31+
public interface OrderConsumer extends ConsumerBase, Admin {
3132

3233
/**
3334
* Subscribe message in order.
@@ -44,12 +45,36 @@ public interface OrderConsumer extends Admin {
4445
/**
4546
* Subscribe to messages, which can be filtered using SQL expressions.
4647
*
47-
* @param topic
48+
* @param topic message topic
4849
* @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
4950
* according to the expression in this selector. Currently supports two expression syntax: {@link
5051
* ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
5152
* above interface.
5253
* @param listener Message callback listener
5354
*/
5455
void subscribe(final String topic, final MessageSelector selector, final MessageOrderListener listener);
56+
57+
/**
58+
* Subscribe message in order.
59+
*
60+
* @param topic message topic.
61+
* @param subExpression Subscribe to the filter expression string, which the broker filters based on this
62+
* expression. <br> eg: "tag1 || tag2 || tag3"<br>, if subExpression is equal to null or *, it means subscribe all
63+
* messages.
64+
* @param listener The message callback listener, the consumer receives the message and then passes it to the
65+
* message callback listener for consumption.
66+
*/
67+
<T> void subscribe(final String topic, final String subExpression, final GenericMessageOrderListener<T> listener);
68+
69+
/**
70+
* Subscribe to messages, which can be filtered using SQL expressions.
71+
*
72+
* @param topic message topic
73+
* @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
74+
* according to the expression in this selector. Currently supports two expression syntax: {@link
75+
* ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
76+
* above interface.
77+
* @param listener Message callback listener
78+
*/
79+
<T> void subscribe(final String topic, final MessageSelector selector, final GenericMessageOrderListener<T> listener);
5580
}

openmessaging-api/src/main/java/io/openmessaging/api/schema/GenericRecord.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,39 @@
1919
import java.util.List;
2020

2121
/**
22-
* Used for traverse the data in the message body When do not know the specific Schema.
22+
* Used for traverse the data in the message body when do not know the specific Schema.
2323
*
2424
* @version OMS 2.0.0
2525
* @since OMS 2.0.0
2626
*/
2727
public interface GenericRecord {
2828

29+
/**
30+
* return schema id
31+
*
32+
* @return schema id
33+
*/
2934
String getSchemaId();
3035

36+
/**
37+
* return the list of fields associated with the record
38+
*
39+
* @return the list of fields associated with the record
40+
*/
3141
List<Field> getFields();
3242

43+
/**
44+
* retrieve the value of the provided field
45+
*
46+
* @param field the field to retrieve the value
47+
* @return the value object
48+
*/
3349
Object getField(Field field);
3450

35-
Object getField(String field);
51+
/**
52+
* retrieve the value of the provided fieldName
53+
* @param fieldName the field name to retrieve the value
54+
* @return the value object
55+
*/
56+
Object getField(String fieldName);
3657
}

0 commit comments

Comments
 (0)