1616 */
1717package org .apache .rocketmq .broker .offset ;
1818
19+ import com .google .common .base .Strings ;
1920import com .google .common .collect .Maps ;
2021import java .util .HashMap ;
2122import java .util .HashSet ;
2627import java .util .concurrent .ConcurrentHashMap ;
2728import java .util .concurrent .ConcurrentMap ;
2829import java .util .concurrent .atomic .AtomicLong ;
29-
30- import com .google .common .base .Strings ;
31-
3230import java .util .function .Function ;
3331import org .apache .rocketmq .broker .BrokerController ;
3432import org .apache .rocketmq .broker .BrokerPathConfigHelper ;
4139import org .apache .rocketmq .remoting .protocol .DataVersion ;
4240import org .apache .rocketmq .remoting .protocol .RemotingSerializable ;
4341
42+ import static org .apache .rocketmq .common .MixAll .BROADCAST_KEY ;
43+
4444public class ConsumerOffsetManager extends ConfigManager {
4545 protected static final Logger LOG = LoggerFactory .getLogger (LoggerName .BROKER_LOGGER_NAME );
4646 public static final String TOPIC_GROUP_SEPARATOR = "@" ;
4747
4848 protected DataVersion dataVersion = new DataVersion ();
4949
50- protected ConcurrentMap <String /* topic@group */ , ConcurrentMap <Integer , Long >> offsetTable =
51- new ConcurrentHashMap <>(512 );
50+ protected ConcurrentMap <String /* topic@group */ , ConcurrentMap <Integer , Long >> offsetTable = new ConcurrentHashMap <>(512 );
5251
53- protected final ConcurrentMap <String , ConcurrentMap <Integer , Long >> resetOffsetTable =
54- new ConcurrentHashMap <>(512 );
52+ protected final ConcurrentMap <String , ConcurrentMap <Integer , Long >> resetOffsetTable = new ConcurrentHashMap <>(512 );
5553
56- private final ConcurrentMap <String /* topic@group */ , ConcurrentMap <Integer , Long >> pullOffsetTable =
57- new ConcurrentHashMap <>(512 );
54+ private final ConcurrentMap <String /* topic@group */ , ConcurrentMap <Integer , Long >> pullOffsetTable = new ConcurrentHashMap <>(512 );
5855
5956 protected transient BrokerController brokerController ;
6057
@@ -78,7 +75,7 @@ public void cleanOffset(String group) {
7875 String topicAtGroup = next .getKey ();
7976 if (topicAtGroup .contains (group )) {
8077 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
81- if (arrays . length == 2 && group .equals (arrays [1 ])) {
78+ if (validateOffsetTableKey ( topicAtGroup ) && group .equals (arrays [1 ])) {
8279 it .remove ();
8380 removeConsumerOffset (topicAtGroup );
8481 LOG .warn ("Clean group's offset, {}, {}" , topicAtGroup , next .getValue ());
@@ -94,7 +91,7 @@ public void cleanOffsetByTopic(String topic) {
9491 String topicAtGroup = next .getKey ();
9592 if (topicAtGroup .contains (topic )) {
9693 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
97- if (arrays . length == 2 && topic .equals (arrays [0 ])) {
94+ if (validateOffsetTableKey ( topicAtGroup ) && topic .equals (arrays [0 ])) {
9895 it .remove ();
9996 removeConsumerOffset (topicAtGroup );
10097 LOG .warn ("Clean topic's offset, {}, {}" , topicAtGroup , next .getValue ());
@@ -109,12 +106,11 @@ public void scanUnsubscribedTopic() {
109106 Entry <String , ConcurrentMap <Integer , Long >> next = it .next ();
110107 String topicAtGroup = next .getKey ();
111108 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
112- if (arrays . length == 2 ) {
109+ if (validateOffsetTableKey ( topicAtGroup ) ) {
113110 String topic = arrays [0 ];
114111 String group = arrays [1 ];
115112
116- if (null == brokerController .getConsumerManager ().findSubscriptionData (group , topic )
117- && this .offsetBehindMuchThanData (topic , next .getValue ())) {
113+ if (null == brokerController .getConsumerManager ().findSubscriptionData (group , topic ) && this .offsetBehindMuchThanData (topic , next .getValue ())) {
118114 it .remove ();
119115 removeConsumerOffset (topicAtGroup );
120116 LOG .warn ("remove topic offset, {}" , topicAtGroup );
@@ -139,13 +135,12 @@ private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integ
139135
140136 public Set <String > whichTopicByConsumer (final String group ) {
141137 Set <String > topics = new HashSet <>();
142-
143138 Iterator <Entry <String , ConcurrentMap <Integer , Long >>> it = this .offsetTable .entrySet ().iterator ();
144139 while (it .hasNext ()) {
145140 Entry <String , ConcurrentMap <Integer , Long >> next = it .next ();
146141 String topicAtGroup = next .getKey ();
147142 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
148- if (arrays . length == 2 ) {
143+ if (validateOffsetTableKey ( topicAtGroup ) ) {
149144 if (group .equals (arrays [1 ])) {
150145 topics .add (arrays [0 ]);
151146 }
@@ -163,7 +158,7 @@ public Set<String> whichGroupByTopic(final String topic) {
163158 Entry <String , ConcurrentMap <Integer , Long >> next = it .next ();
164159 String topicAtGroup = next .getKey ();
165160 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
166- if (arrays . length == 2 ) {
161+ if (validateOffsetTableKey ( topicAtGroup ) ) {
167162 if (topic .equals (arrays [0 ])) {
168163 groups .add (arrays [1 ]);
169164 }
@@ -178,7 +173,7 @@ public Map<String, Set<String>> getGroupTopicMap() {
178173
179174 for (String key : this .offsetTable .keySet ()) {
180175 String [] arr = key .split (TOPIC_GROUP_SEPARATOR );
181- if (arr . length == 2 ) {
176+ if (validateOffsetTableKey ( key ) ) {
182177 String topic = arr [0 ];
183178 String group = arr [1 ];
184179
@@ -224,16 +219,16 @@ public void commitPullOffset(final String clientHost, final String group, final
224219 final long offset ) {
225220 // topic@group
226221 String key = topic + TOPIC_GROUP_SEPARATOR + group ;
227- ConcurrentMap <Integer , Long > map = this .pullOffsetTable .computeIfAbsent (
228- key , k -> new ConcurrentHashMap <>(32 ));
222+ ConcurrentMap <Integer , Long > map = this .pullOffsetTable .computeIfAbsent (key , k -> new ConcurrentHashMap <>(32 ));
229223 map .put (queueId , offset );
230224 }
231225
232226 /**
233227 * If the target queue has temporary reset offset, return the reset-offset.
234228 * Otherwise, return the current consume offset in the offset store.
235- * @param group Consumer group
236- * @param topic Topic
229+ *
230+ * @param group Consumer group
231+ * @param topic Topic
237232 * @param queueId Queue ID
238233 * @return current consume offset or reset offset if there were one.
239234 */
@@ -261,8 +256,9 @@ public long queryOffset(final String group, final String topic, final int queueI
261256
262257 /**
263258 * Query pull offset in pullOffsetTable
264- * @param group Consumer group
265- * @param topic Topic
259+ *
260+ * @param group Consumer group
261+ * @param topic Topic
266262 * @param queueId Queue ID
267263 * @return latest pull offset of consumer group
268264 */
@@ -330,7 +326,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
330326 Iterator <String > it = topicGroups .iterator ();
331327 while (it .hasNext ()) {
332328 String topicAtGroup = it .next ();
333- if (group .equals (topicAtGroup .split (TOPIC_GROUP_SEPARATOR )[1 ])) {
329+ if (validateOffsetTableKey ( topicAtGroup ) && group .equals (topicAtGroup .split (TOPIC_GROUP_SEPARATOR )[1 ])) {
334330 it .remove ();
335331 removeConsumerOffset (topicAtGroup );
336332 }
@@ -341,7 +337,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
341337 for (Map .Entry <String , ConcurrentMap <Integer , Long >> offSetEntry : this .offsetTable .entrySet ()) {
342338 String topicGroup = offSetEntry .getKey ();
343339 String [] topicGroupArr = topicGroup .split (TOPIC_GROUP_SEPARATOR );
344- if (topic .equals (topicGroupArr [0 ])) {
340+ if (validateOffsetTableKey ( topicGroup ) && topic .equals (topicGroupArr [0 ])) {
345341 for (Entry <Integer , Long > entry : offSetEntry .getValue ().entrySet ()) {
346342 long minOffset = this .brokerController .getMessageStore ().getMinOffsetInQueue (topic , entry .getKey ());
347343 if (entry .getValue () >= minOffset ) {
@@ -407,7 +403,7 @@ public void removeOffset(final String group) {
407403 String topicAtGroup = entry .getKey ();
408404 if (topicAtGroup .contains (group )) {
409405 String [] arrays = topicAtGroup .split (TOPIC_GROUP_SEPARATOR );
410- if (arrays . length == 2 && group .equals (arrays [1 ])) {
406+ if (validateOffsetTableKey ( topicAtGroup ) && group .equals (arrays [1 ])) {
411407 it .remove ();
412408 removeConsumerOffset (topicAtGroup );
413409 removed = true ;
@@ -421,14 +417,12 @@ public void removeOffset(final String group) {
421417 boolean clearReset = deleteFunction .apply (this .resetOffsetTable .entrySet ().iterator ());
422418 boolean clearPull = deleteFunction .apply (this .pullOffsetTable .entrySet ().iterator ());
423419
424- LOG .info ("Consumer offset manager clean group offset, groupName={}, " +
425- "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}" , group , clearOffset , clearReset , clearPull );
420+ LOG .info ("Consumer offset manager clean group offset, groupName={}, " + "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}" , group , clearOffset , clearReset , clearPull );
426421 }
427422
428423 public void assignResetOffset (String topic , String group , int queueId , long offset ) {
429424 if (Strings .isNullOrEmpty (topic ) || Strings .isNullOrEmpty (group ) || queueId < 0 || offset < 0 ) {
430- LOG .warn ("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}" ,
431- topic , group , queueId , offset );
425+ LOG .warn ("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}" , topic , group , queueId , offset );
432426 return ;
433427 }
434428
@@ -461,4 +455,9 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
461455 return map .remove (queueId );
462456 }
463457 }
464- }
458+
459+ public boolean validateOffsetTableKey (String key ) {
460+ String [] arr = key .split (TOPIC_GROUP_SEPARATOR );
461+ return arr .length == 2 || (arr .length == 3 && BROADCAST_KEY .equals (arr [2 ]));
462+ }
463+ }
0 commit comments