Skip to content

Commit 5f9889d

Browse files
authored
Merge pull request #71 from duhenglucky/schema_with_builder
[DISCUSS]feat(schema) add schema support for OMS
2 parents 0ce5a93 + 8045676 commit 5f9889d

29 files changed

Lines changed: 923 additions & 11 deletions
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.samples;
18+
public class MessageSample {
19+
private String name;
20+
21+
public MessageSample(String name) {
22+
this.name = name;
23+
}
24+
25+
public String getName() {
26+
return name;
27+
}
28+
29+
public void setName(String name) {
30+
this.name = name;
31+
}
32+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.openmessaging.samples.consumer;
19+
20+
import io.openmessaging.api.GenericMessage;
21+
import io.openmessaging.api.MessagingAccessPoint;
22+
import io.openmessaging.api.OMS;
23+
import io.openmessaging.api.OMSBuiltinKeys;
24+
import io.openmessaging.api.PullConsumer;
25+
import io.openmessaging.api.TopicPartition;
26+
import io.openmessaging.samples.MessageSample;
27+
import java.util.List;
28+
import java.util.Properties;
29+
import java.util.Set;
30+
31+
public class GenericPullConsumerApp {
32+
public static void main(String[] args) {
33+
//Load and start the vendor implementation from a specific OMS driver URL.
34+
final MessagingAccessPoint messagingAccessPoint =
35+
OMS.builder()
36+
.endpoint("http://mq-instance-xxx-1234567890-test:8080")
37+
.region("Shanghai")
38+
.driver("rocketmq")
39+
.build();
40+
41+
Properties properties = new Properties();
42+
properties.setProperty(OMSBuiltinKeys.DESERIALIZER, "io.openmessaging.openmeta.impl.Deserializer");
43+
properties.setProperty(OMSBuiltinKeys.OPEN_META_URL, "http://localhost:1234");
44+
45+
//Start a PullConsumer to receive messages from the specific queue.
46+
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties);
47+
48+
//Register a shutdown hook to close the opened endpoints.
49+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
50+
@Override
51+
public void run() {
52+
consumer.shutdown();
53+
}
54+
}));
55+
56+
Set<TopicPartition> topicPartitions = consumer.topicPartitions("NS://TOPIC");
57+
consumer.assign(topicPartitions);
58+
consumer.start();
59+
60+
List<GenericMessage<MessageSample>> messages = consumer.pollGenericMessages(1000);
61+
for (GenericMessage<MessageSample> message : messages) {
62+
System.out.println("Received message: " + message.getValue());
63+
}
64+
//Acknowledge the consumed message
65+
consumer.commitSync();
66+
consumer.shutdown();
67+
68+
}
69+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.samples.consumer;
18+
19+
import io.openmessaging.api.Action;
20+
import io.openmessaging.api.Consumer;
21+
import io.openmessaging.api.GenericMessage;
22+
import io.openmessaging.api.GenericMessageListener;
23+
import io.openmessaging.api.MessageConsumeContext;
24+
import io.openmessaging.api.MessagingAccessPoint;
25+
import io.openmessaging.api.OMS;
26+
import io.openmessaging.api.OMSBuiltinKeys;
27+
import io.openmessaging.samples.MessageSample;
28+
import java.util.Properties;
29+
30+
public class GenericPushConsumerApp {
31+
public static void main(String[] args) {
32+
//Load and start the vendor implementation from a specific OMS driver URL.
33+
final MessagingAccessPoint messagingAccessPoint =
34+
OMS.builder()
35+
.region("Shanghai")
36+
.endpoint("127.0.0.1:9876")
37+
.driver("rocketmq")
38+
.withCredentials(new Properties())
39+
.build();
40+
41+
Properties properties = new Properties();
42+
properties.setProperty(OMSBuiltinKeys.DESERIALIZER, "io.openmessaging.openmeta.impl.Deserializer");
43+
properties.setProperty(OMSBuiltinKeys.OPEN_META_URL, "http://localhost:1234");
44+
45+
final Consumer consumer = messagingAccessPoint.createConsumer(properties);
46+
consumer.start();
47+
48+
//Register a shutdown hook to close the opened endpoints.
49+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
50+
@Override
51+
public void run() {
52+
consumer.shutdown();
53+
}
54+
}));
55+
56+
//Consume messages from a simple queue.
57+
String topic = "NS://HELLO_TOPIC";
58+
59+
consumer.subscribe(topic, "*", new GenericMessageListener<MessageSample>() {
60+
@Override
61+
public Action consume(GenericMessage<MessageSample> message, MessageConsumeContext context) {
62+
MessageSample messageSample = message.getValue();
63+
System.out.println("Received message: " + messageSample);
64+
return Action.CommitMessage;
65+
}
66+
});
67+
68+
consumer.shutdown();
69+
}
70+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.samples.producer;
18+
19+
import io.openmessaging.api.Message;
20+
import io.openmessaging.api.MessageBuilder;
21+
import io.openmessaging.api.OMSBuiltinKeys;
22+
import io.openmessaging.api.serialization.Serializer;
23+
import java.lang.reflect.Constructor;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Properties;
27+
28+
public class MessageBuilderImpl<T> implements MessageBuilder<T> {
29+
30+
private T body;
31+
32+
private String keys;
33+
34+
private String tags;
35+
36+
private String shardingKey;
37+
38+
private String topic;
39+
40+
private final Map<String, String> userProperties = new HashMap<>();
41+
42+
private final Properties properties;
43+
44+
private Serializer<T> serializer;
45+
46+
public MessageBuilderImpl(Properties properties) throws Exception {
47+
this.properties = properties;
48+
Class<?> clazz = Class.forName(properties.getProperty(OMSBuiltinKeys.ENDPOINT));
49+
Constructor<T> constructor = (Constructor<T>) clazz.getDeclaredConstructor();
50+
this.serializer = (Serializer<T>) constructor.newInstance();
51+
}
52+
53+
@Override public MessageBuilder withTopic(String topic) {
54+
this.topic = topic;
55+
return this;
56+
}
57+
58+
@Override public MessageBuilder withValue(T t) {
59+
this.body = t;
60+
return this;
61+
}
62+
63+
@Override public MessageBuilder withKey(String keys) {
64+
this.keys = keys;
65+
return this;
66+
}
67+
68+
@Override public MessageBuilder withTags(String tags) {
69+
this.tags = tags;
70+
return this;
71+
}
72+
73+
@Override public MessageBuilder withProperty(String key, String value) {
74+
this.properties.put(key, value);
75+
return this;
76+
}
77+
78+
@Override public MessageBuilder withShardingKey(String shardingKey) {
79+
this.shardingKey = shardingKey;
80+
return this;
81+
}
82+
83+
@Override public String getTopic() {
84+
return this.topic;
85+
}
86+
87+
@Override public Message build() {
88+
Message message = new Message();
89+
message.setKey(this.keys);
90+
message.setBody(this.serializer.serialize(this.topic, body));
91+
return message;
92+
}
93+
}

openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import io.openmessaging.api.Message;
2121
import io.openmessaging.api.MessagingAccessPoint;
2222
import io.openmessaging.api.OMS;
23+
import io.openmessaging.api.OMSBuiltinKeys;
2324
import io.openmessaging.api.OnExceptionContext;
2425
import io.openmessaging.api.Producer;
2526
import io.openmessaging.api.SendCallback;
2627
import io.openmessaging.api.SendResult;
28+
import io.openmessaging.samples.MessageSample;
2729
import java.util.Properties;
2830

2931
public class ProducerApp {
32+
3033
public static void main(String[] args) {
3134
final MessagingAccessPoint messagingAccessPoint =
3235
OMS.builder()
@@ -36,7 +39,11 @@ public static void main(String[] args) {
3639
.withCredentials(new Properties())
3740
.build();
3841

39-
final Producer producer = messagingAccessPoint.createProducer(new Properties());
42+
Properties properties = new Properties();
43+
properties.setProperty(OMSBuiltinKeys.SERIALIZER, "io.openmessaging.openmeta.impl.Serializer");
44+
properties.setProperty(OMSBuiltinKeys.OPEN_META_URL, "http://localhost:1234");
45+
46+
final Producer producer = messagingAccessPoint.createProducer(properties);
4047
producer.start();
4148

4249
//Register a shutdown hook to close the opened endpoints.
@@ -47,9 +54,13 @@ public void run() {
4754
}
4855
}));
4956

50-
Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());
57+
MessageSample messageSample = new MessageSample("Bob");
58+
59+
Message message = producer.messageBuilder().withTopic("NS://topicA")
60+
.withValue(messageSample).withKey("messageKey").withTags("TagA").build();
5161

5262
SendResult sendResult = producer.send(message);
63+
5364
System.out.println("SendResult: " + sendResult);
5465

5566
//Sends a message to the specified destination async.
@@ -61,7 +72,7 @@ public void onSuccess(SendResult sendResult) {
6172

6273
@Override
6374
public void onException(OnExceptionContext context) {
64-
75+
context.getException().printStackTrace();
6576
}
6677
});
6778

openmessaging-api/src/main/java/io/openmessaging/api/ConsumeContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616
*/
1717
package io.openmessaging.api;
1818

19-
public class ConsumeContext {
19+
public class ConsumeContext implements MessageConsumeContext{
2020

2121
}

openmessaging-api/src/main/java/io/openmessaging/api/Consumer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,30 @@ public interface Consumer extends Admin {
4848
*/
4949
void subscribe(final String topic, final MessageSelector selector, final MessageListener listener);
5050

51+
/**
52+
* Subscribe message in order.
53+
*
54+
* @param topic message topic.
55+
* @param subExpression Subscribe to the filter expression string, which the broker filters based on this
56+
* expression. <br> eg: "tag1 || tag2 || tag3"<br>, if subExpression is equal to null or *, it means subscribe all
57+
* messages.
58+
* @param listener The message callback listener, the consumer receives the message and then passes it to the
59+
* message callback listener for consumption.
60+
*/
61+
void subscribe(final String topic, final String subExpression, final GenericMessageListener listener);
62+
63+
/**
64+
* Subscribe to messages, which can be filtered using SQL expressions.
65+
*
66+
* @param topic
67+
* @param selector Subscribe to the message selector (can be empty, indicating no filtering), the ONS server filters
68+
* according to the expression in this selector. Currently supports two expression syntax: {@link
69+
* ExpressionType#TAG}, {@link ExpressionType#SQL92} Among them, the effect of TAG filtering is consistent with the
70+
* above interface.
71+
* @param listener Message callback listener
72+
*/
73+
void subscribe(final String topic, final MessageSelector selector, final GenericMessageListener listener);
74+
5175
/**
5276
* Unsubscribe message
5377
*
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.api;
18+
19+
/**
20+
* An abstract class used for get the object instead of bytes.
21+
*
22+
* @param <T> Object deserialized from the message body.
23+
* @version OMS 2.0.0
24+
* @since OMS 2.0.0
25+
*/
26+
public abstract class GenericMessage<T> extends Message {
27+
28+
/**
29+
* @return Object deserialized from the message body.
30+
*/
31+
public abstract T getValue();
32+
}

0 commit comments

Comments
 (0)