|
16 | 16 | */ |
17 | 17 | package org.apache.rocketmq.broker.offset; |
18 | 18 |
|
| 19 | +import com.google.common.collect.Maps; |
19 | 20 | import java.util.HashMap; |
20 | 21 | import java.util.HashSet; |
21 | 22 | import java.util.Iterator; |
@@ -417,27 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs |
417 | 418 | } |
418 | 419 |
|
419 | 420 | String key = topic + TOPIC_GROUP_SEPARATOR + group; |
420 | | - ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key); |
421 | | - if (null == map) { |
422 | | - map = new ConcurrentHashMap<Integer, Long>(); |
423 | | - ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map); |
424 | | - if (null != previous) { |
425 | | - map = previous; |
426 | | - } |
427 | | - } |
428 | | - |
429 | | - map.put(queueId, offset); |
430 | | - LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", |
431 | | - topic, group, queueId, offset); |
| 421 | + resetOffsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset); |
| 422 | + LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset); |
432 | 423 |
|
433 | 424 | // Two things are important here: |
434 | 425 | // 1, currentOffsetMap might be null if there is no previous records; |
435 | 426 | // 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes |
436 | 427 | // sense in cases like clients are offline. |
437 | | - ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key); |
438 | | - if (null != currentOffsetMap) { |
439 | | - currentOffsetMap.put(queueId, offset); |
440 | | - } |
| 428 | + offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset); |
441 | 429 | } |
442 | 430 |
|
443 | 431 | public boolean hasOffsetReset(String topic, String group, int queueId) { |
|
0 commit comments