Skip to content

Commit d556460

Browse files
3424672656hqbfzwang
andauthored
fix: clean pull offset in #removeOffset (#9490)
Co-authored-by: hqbfzwang <hqbfzwang@tencent.com>
1 parent 8abc0a1 commit d556460

1 file changed

Lines changed: 38 additions & 17 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -118,33 +118,54 @@ public void removeOffset(String group) {
118118
+ Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */;
119119

120120
// [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
121-
ByteBuf beginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
122-
beginKey.writeByte(TablePrefix.TABLE.getValue());
123-
beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
124-
beginKey.writeByte(RecordPrefix.DATA.getValue());
125-
beginKey.writeShort(groupBytes.length);
126-
beginKey.writeBytes(groupBytes);
127-
beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
121+
ByteBuf consumerOffsetBeginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
122+
consumerOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
123+
consumerOffsetBeginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
124+
consumerOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
125+
consumerOffsetBeginKey.writeShort(groupBytes.length);
126+
consumerOffsetBeginKey.writeBytes(groupBytes);
127+
consumerOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
128+
129+
ByteBuf consumerOffsetEndKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
130+
consumerOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
131+
consumerOffsetEndKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
132+
consumerOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
133+
consumerOffsetEndKey.writeShort(groupBytes.length);
134+
consumerOffsetEndKey.writeBytes(groupBytes);
135+
consumerOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
128136

129-
ByteBuf endKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
130-
endKey.writeByte(TablePrefix.TABLE.getValue());
131-
endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
132-
endKey.writeByte(RecordPrefix.DATA.getValue());
133-
endKey.writeShort(groupBytes.length);
134-
endKey.writeBytes(groupBytes);
135-
endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
137+
// [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
138+
ByteBuf pullOffsetBeginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
139+
pullOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
140+
pullOffsetBeginKey.writeShort(TableId.PULL_OFFSET.getValue());
141+
pullOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
142+
pullOffsetBeginKey.writeShort(groupBytes.length);
143+
pullOffsetBeginKey.writeBytes(groupBytes);
144+
pullOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
145+
146+
ByteBuf pullOffsetEndKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
147+
pullOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
148+
pullOffsetEndKey.writeShort(TableId.PULL_OFFSET.getValue());
149+
pullOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
150+
pullOffsetEndKey.writeShort(groupBytes.length);
151+
pullOffsetEndKey.writeBytes(groupBytes);
152+
pullOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
136153
try (WriteBatch writeBatch = new WriteBatch()) {
137154
// TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here
138-
writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey));
155+
writeBatch.deleteRange(ConfigHelper.readBytes(consumerOffsetBeginKey), ConfigHelper.readBytes(consumerOffsetEndKey));
156+
writeBatch.deleteRange(ConfigHelper.readBytes(pullOffsetBeginKey), ConfigHelper.readBytes(pullOffsetEndKey));
139157
MessageStore messageStore = brokerController.getMessageStore();
140158
long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0;
141159
ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, dataVersion, stateMachineVersion);
160+
ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET, dataVersion, stateMachineVersion);
142161
configStorage.write(writeBatch);
143162
} catch (RocksDBException e) {
144163
LOG.error("Failed to consumer offsets by group={}", group, e);
145164
} finally {
146-
beginKey.release();
147-
endKey.release();
165+
consumerOffsetBeginKey.release();
166+
consumerOffsetEndKey.release();
167+
pullOffsetBeginKey.release();
168+
pullOffsetEndKey.release();
148169
}
149170
}
150171

0 commit comments

Comments
 (0)