diff --git a/.github/workflows/issue-advisor.yml b/.github/workflows/issue-advisor.yml new file mode 100644 index 00000000000..49dc7a6ca70 --- /dev/null +++ b/.github/workflows/issue-advisor.yml @@ -0,0 +1,189 @@ +name: Issue Advisor + +permissions: + issues: write + contents: read + +on: + issues: + types: [opened] + +jobs: + analyze-issue: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Get issue information + id: issue-info + uses: actions/github-script@v6 + with: + script: | + const issue = context.payload.issue; + return { + title: issue.title, + body: issue.body || '', + labels: issue.labels.map(l => l.name).join(', '), + author: issue.user.login + }; + + - name: Analyze issue with AI + id: ai-analysis + uses: actions/github-script@v6 + env: + ISSUE_TITLE: ${{ steps.issue-info.outputs.result.title }} + ISSUE_BODY: ${{ steps.issue-info.outputs.result.body }} + ISSUE_LABELS: ${{ steps.issue-info.outputs.result.labels }} + with: + script: | + const issueTitle = process.env.ISSUE_TITLE; + const issueBody = process.env.ISSUE_BODY; + const issueLabels = process.env.ISSUE_LABELS; + + // 分析 issue 类型 + let issueType = 'general'; + let priority = 'medium'; + let recommendations = []; + + // 判断 issue 类型 + if (issueLabels.includes('bug') || issueTitle.toLowerCase().includes('[bug]')) { + issueType = 'bug'; + recommendations.push('请确保提供了完整的重现步骤和环境信息'); + recommendations.push('检查是否有相关的错误日志或堆栈跟踪'); + recommendations.push('建议添加最小可复现示例(Minimal Reproducible Example)'); + } else if (issueLabels.includes('enhancement') || issueTitle.toLowerCase().includes('feature')) { + issueType = 'feature'; + recommendations.push('建议详细描述功能需求和使用场景'); + recommendations.push('可以提供设计方案或 API 设计草图'); + recommendations.push('说明该功能对用户的价值和预期收益'); + } else if (issueLabels.includes('question')) { + issueType = 'question'; + recommendations.push('建议先查阅官方文档:https://rocketmq.apache.org/docs/'); + recommendations.push('可以在 GitHub Discussions 中讨论:https://github.com/apache/rocketmq/discussions'); + recommendations.push('搜索已有的 Issues 看是否有类似问题'); + } else if (issueTitle.toLowerCase().includes('doc') || issueLabels.includes('documentation')) { + issueType = 'documentation'; + recommendations.push('请指明具体的文档页面或章节'); + recommendations.push('说明当前文档的问题和改进建议'); + recommendations.push('如果可能,欢迎直接提交文档 PR'); + } + + // 分析优先级 + const urgentKeywords = ['crash', 'critical', 'data loss', 'security', 'urgent']; + const highKeywords = ['error', 'exception', 'fail', 'broken', 'not working']; + const combinedText = (issueTitle + ' ' + issueBody).toLowerCase(); + + if (urgentKeywords.some(keyword => combinedText.includes(keyword))) { + priority = 'high'; + } else if (highKeywords.some(keyword => combinedText.includes(keyword))) { + priority = 'medium-high'; + } + + // 检查 issue 质量 + let qualityIssues = []; + if (!issueBody || issueBody.length < 50) { + qualityIssues.push('Issue 描述过于简短,建议提供更详细的信息'); + } + if (!issueBody.includes('version') && !issueBody.includes('版本')) { + qualityIssues.push('建议提供 RocketMQ 版本信息'); + } + + return { + type: issueType, + priority: priority, + recommendations: recommendations, + qualityIssues: qualityIssues + }; + + - name: Post AI analysis comment + uses: actions/github-script@v6 + with: + script: | + const analysis = ${{ steps.ai-analysis.outputs.result }}; + + // 构建评论内容 + let comment = '## 🤖 Issue Advisor 分析报告\n\n'; + + // Issue 类型 + const typeEmojis = { + bug: '🐛', + feature: '✨', + question: '❓', + documentation: '📚', + general: '📝' + }; + comment += `**Issue 类型**: ${typeEmojis[analysis.type] || '📝'} ${analysis.type}\n`; + + // 优先级 + const priorityEmojis = { + high: '🔴', + 'medium-high': '🟡', + medium: '🟢', + low: '⚪️' + }; + comment += `**建议优先级**: ${priorityEmojis[analysis.priority] || '🟢'} ${analysis.priority}\n\n`; + + // 处理建议 + if (analysis.recommendations && analysis.recommendations.length > 0) { + comment += '### 📋 处理建议\n\n'; + analysis.recommendations.forEach((rec, index) => { + comment += `${index + 1}. ${rec}\n`; + }); + comment += '\n'; + } + + // 质量问题 + if (analysis.qualityIssues && analysis.qualityIssues.length > 0) { + comment += '### ⚠️ Issue 质量建议\n\n'; + analysis.qualityIssues.forEach((issue, index) => { + comment += `${index + 1}. ${issue}\n`; + }); + comment += '\n'; + } + + // 通用建议 + comment += '### 💡 通用建议\n\n'; + comment += '- 确保使用最新版本的 RocketMQ\n'; + comment += '- 提供完整的配置文件(隐藏敏感信息)\n'; + comment += '- 如果是性能问题,请提供性能测试数据\n'; + comment += '- 欢迎提交 Pull Request 来解决问题\n\n'; + + + comment += '---\n'; + comment += '*本分析由 AI 自动生成,仅供参考。维护者会尽快审查此 Issue。*'; + + // 发布评论 + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number, + body: comment + }); + + core.info('Successfully posted AI analysis comment'); + + - name: Add labels based on analysis + uses: actions/github-script@v6 + with: + script: | + const labels = []; + + + // 添加 AI 分析标签 + labels.push('ai-analyzed'); + + if (labels.length > 0) { + try { + await github.rest.issues.addLabels({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number, + labels: labels + }); + core.info(`Added labels: ${labels.join(', ')}`); + } catch (error) { + core.warning(`Failed to add labels: ${error.message}`); + } + } + diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java index 72b39a3c318..088415a7cdd 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java @@ -154,7 +154,11 @@ private int comparePolicyEntries(PolicyEntry o1, PolicyEntry o2) { // the decision deny has higher priority Decision d1 = o1.getDecision(); Decision d2 = o2.getDecision(); - return d1 == Decision.DENY ? 1 : d2 == Decision.DENY ? -1 : 0; + + if (d1 != d2) { + return d1 == Decision.DENY ? -1 : 1; + } + return 0; } private static void throwException(DefaultAuthorizationContext context, String detail) { diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandlerTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandlerTest.java new file mode 100644 index 00000000000..30a2518c7f7 --- /dev/null +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandlerTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.auth.authorization.chain; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory; +import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager; +import org.apache.rocketmq.auth.authentication.model.Subject; +import org.apache.rocketmq.auth.authentication.model.User; +import org.apache.rocketmq.auth.authorization.context.DefaultAuthorizationContext; +import org.apache.rocketmq.auth.authorization.enums.Decision; +import org.apache.rocketmq.auth.authorization.enums.PolicyType; +import org.apache.rocketmq.auth.authorization.exception.AuthorizationException; +import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory; +import org.apache.rocketmq.auth.authorization.manager.AuthorizationMetadataManager; +import org.apache.rocketmq.auth.authorization.model.Acl; +import org.apache.rocketmq.auth.authorization.model.Policy; +import org.apache.rocketmq.auth.authorization.model.PolicyEntry; +import org.apache.rocketmq.auth.authorization.model.Resource; +import org.apache.rocketmq.auth.config.AuthConfig; +import org.apache.rocketmq.auth.helper.AuthTestHelper; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.chain.HandlerChain; +import org.apache.rocketmq.common.resource.ResourcePattern; +import org.apache.rocketmq.common.resource.ResourceType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Mockito.mock; + +public class AclAuthorizationHandlerTest { + + private AuthConfig authConfig; + private AuthenticationMetadataManager authenticationMetadataManager; + private AuthorizationMetadataManager authorizationMetadataManager; + private AclAuthorizationHandler handler; + private HandlerChain> nextChain; + + @Before + public void setUp() { + if (MixAll.isMac()) { + return; + } + this.authConfig = AuthTestHelper.createDefaultConfig(); + this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig); + this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig); + this.handler = new AclAuthorizationHandler(this.authConfig); + this.nextChain = mock(HandlerChain.class); + clearAllAcls(); + clearAllUsers(); + } + + @After + public void tearDown() { + if (MixAll.isMac()) { + return; + } + clearAllAcls(); + clearAllUsers(); + this.authenticationMetadataManager.shutdown(); + this.authorizationMetadataManager.shutdown(); + } + + @Test + public void testNoAclThrows() { + if (MixAll.isMac()) { + return; + } + // Create a user with no ACL entries. + User user = User.of("noacl", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Exception e) { + AuthTestHelper.handleException(e); + } + }); + Assert.assertEquals("User:noacl has no permission to access Topic:t1 from 127.0.0.1, no matched policies.", + authorizationException.getMessage()); + } + + @Test + public void testNoMatchedPolicyThrows() { + if (MixAll.isMac()) { + return; + } + User user = User.of("no_match_acl", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + Acl acl = AuthTestHelper.buildAcl("User:no_match_acl", "Topic:abc", Action.SUB.getName(), null, Decision.ALLOW); + authorizationMetadataManager.createAcl(acl).join(); + + // Ensure an ACL has been created. + List acls = authorizationMetadataManager.listAcl(null, null).join(); + Assert.assertEquals(1, acls.size()); + + // The requested resource does not match any ACL entry. + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Exception e) { + AuthTestHelper.handleException(e); + } + }); + Assert.assertEquals("User:no_match_acl has no permission to access Topic:t1 from 127.0.0.1, no matched policies.", + authorizationException.getMessage()); + } + + @Test + public void testDecisionDenyThrows() { + if (MixAll.isMac()) { + return; + } + User user = User.of("deny", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The ACL entry matches, but the decision is DENY. + Acl acl = AuthTestHelper.buildAcl("User:deny", "Topic:t1", Action.SUB.getName(), null, Decision.DENY); + authorizationMetadataManager.createAcl(acl).join(); + + List acls = authorizationMetadataManager.listAcl(null, null).join(); + Assert.assertEquals(1, acls.size()); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Exception e) { + AuthTestHelper.handleException(e); + } + }); + Assert.assertEquals("User:deny has no permission to access Topic:t1 from 127.0.0.1, the decision is deny.", + authorizationException.getMessage()); + } + + @Test + public void testAllowDoesNotThrow() { + if (MixAll.isMac()) { + return; + } + User user = User.of("allow", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The ACL matches and the decision is ALLOW. + Acl acl = AuthTestHelper.buildAcl("User:allow", "Topic:t1", Action.SUB.getName(), null, Decision.ALLOW); + authorizationMetadataManager.createAcl(acl).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + + handler.handle(ctx, nextChain).join(); + } + + @Test + public void testDenyBeatsAllow() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // Set up policy entries with both ALLOW and DENY for the same resource. + Resource resource = Resource.of(ResourceType.TOPIC, "t1", ResourcePattern.LITERAL); + PolicyEntry allowLiteral = PolicyEntry.of(resource, Collections.singletonList(Action.SUB), null, Decision.ALLOW); + PolicyEntry denyLiteral = PolicyEntry.of(resource, Collections.singletonList(Action.SUB), null, Decision.DENY); + + // Include both entries in the policy to verify precedence. + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(allowLiteral, denyLiteral, allowLiteral))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, resource, Action.SUB, "127.0.0.1"); + + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Throwable e) { + AuthTestHelper.handleException(e); + } + }); + // DENY should take precedence. + Assert.assertEquals("User:user has no permission to access Topic:t1 from 127.0.0.1, the decision is deny.", authorizationException.getMessage()); + } + + @Test + public void testPrefixedLongerDenyBeatsPrefixedShorterAllow() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The longer PREFIXED DENY policy entry should take precedence over the shorter PREFIXED ALLOW policy entry. + PolicyEntry denyLonger = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1-abc", ResourcePattern.PREFIXED), + Collections.singletonList(Action.SUB), null, Decision.DENY); + PolicyEntry allowShorter = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1-", ResourcePattern.PREFIXED), + Collections.singletonList(Action.SUB), null, Decision.ALLOW); + + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(allowShorter, denyLonger))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1-abcd"), Action.SUB, "127.0.0.1"); + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Throwable e) { + AuthTestHelper.handleException(e); + } + }); + Assert.assertEquals("User:user has no permission to access Topic:t1-abcd from 127.0.0.1, the decision is deny.", authorizationException.getMessage()); + } + + @Test + public void testLiteralAllowBeatsPrefixedDeny() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The LITERAL ALLOW policy entry should take precedence over the PREFIXED DENY policy entry. + PolicyEntry allowLiteral = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1", ResourcePattern.LITERAL), + Collections.singletonList(Action.SUB), null, Decision.ALLOW); + PolicyEntry denyPrefixed = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t", ResourcePattern.PREFIXED), + Collections.singletonList(Action.SUB), null, Decision.DENY); + + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(denyPrefixed, allowLiteral))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + handler.handle(ctx, nextChain).join(); + } + + @Test + public void testTopicTypeAllowBeatsAnyTypeDeny() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The ALLOW policy entry with resource type TOPIC should take precedence over the DENY policy entry with resource type ANY. + PolicyEntry denyAnyType = PolicyEntry.of( + Resource.of(ResourceType.ANY, "t1", ResourcePattern.LITERAL), + Collections.singletonList(Action.SUB), null, Decision.DENY); + PolicyEntry allowTopicType = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1", ResourcePattern.LITERAL), + Collections.singletonList(Action.SUB), null, Decision.ALLOW); + + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(allowTopicType, denyAnyType))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + handler.handle(ctx, nextChain).join(); + } + + @Test + public void testPrefixedPatternAllowBeatsAnyPatternDeny() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The PREFIXED pattern ALLOW policy entry should take precedence over the ANY pattern DENY policy entry. + PolicyEntry denyAny = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, null, ResourcePattern.ANY), + Collections.singletonList(Action.SUB), null, Decision.DENY); + PolicyEntry allowPrefixed = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1", ResourcePattern.PREFIXED), + Collections.singletonList(Action.SUB), null, Decision.ALLOW); + + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(allowPrefixed, denyAny))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + handler.handle(ctx, nextChain).join(); + } + + @Test + public void testLiteralPatternDenyBeatsAnyPatternAllow() { + if (MixAll.isMac()) { + return; + } + User user = User.of("user", "pwd"); + authenticationMetadataManager.createUser(user).join(); + + // The LITERAL pattern DENY policy entry should take precedence over the ANY pattern ALLOW policy entry. + PolicyEntry allowAny = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, null, ResourcePattern.ANY), + Collections.singletonList(Action.SUB), null, Decision.ALLOW); + PolicyEntry denyLiteral = PolicyEntry.of( + Resource.of(ResourceType.TOPIC, "t1", ResourcePattern.LITERAL), + Collections.singletonList(Action.SUB), null, Decision.DENY); + + + Policy policy = Policy.of(PolicyType.CUSTOM, new ArrayList<>(Arrays.asList(allowAny, denyLiteral))); + authorizationMetadataManager.createAcl(Acl.of(user, policy)).join(); + + DefaultAuthorizationContext ctx = buildContext(user, Resource.ofTopic("t1"), Action.SUB, "127.0.0.1"); + AuthorizationException authorizationException = Assert.assertThrows(AuthorizationException.class, () -> { + try { + handler.handle(ctx, nextChain).join(); + } catch (Throwable e) { + AuthTestHelper.handleException(e); + } + }); + Assert.assertEquals("User:user has no permission to access Topic:t1 from 127.0.0.1, the decision is deny.", authorizationException.getMessage()); + } + + private DefaultAuthorizationContext buildContext(Subject subject, Resource resource, Action action, String sourceIp) { + return DefaultAuthorizationContext.of(subject, resource, action, sourceIp); + } + + private void clearAllUsers() { + List users = this.authenticationMetadataManager.listUser(null).join(); + if (CollectionUtils.isEmpty(users)) { + return; + } + users.forEach(user -> this.authenticationMetadataManager.deleteUser(user.getUsername()).join()); + } + + private void clearAllAcls() { + List acls = this.authorizationMetadataManager.listAcl(null, null).join(); + if (CollectionUtils.isEmpty(acls)) { + return; + } + acls.forEach(acl -> this.authorizationMetadataManager.deleteAcl(acl.getSubject(), null, null).join()); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index bec75fe2fb6..8ba9834702e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -546,7 +546,7 @@ private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String m } } - public class HandlePutResultTask implements Runnable { + class HandlePutResultTask implements Runnable { private final int delayLevel; public HandlePutResultTask(int delayLevel) { @@ -558,6 +558,12 @@ public void run() { LinkedBlockingQueue pendingQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); + // Check if the queue exists for the given level + if (pendingQueue == null) { + log.warn("No pending queue found for delay level: {}", this.delayLevel); + return; + } + PutResultProcess putResultProcess; while ((putResultProcess = pendingQueue.peek()) != null) { try { @@ -599,7 +605,7 @@ private void scheduleNextTask() { } } - public class PutResultProcess { + class PutResultProcess { private String topic; private long offset; private long physicOffset; @@ -824,7 +830,7 @@ public String toString() { } } - public enum ProcessStatus { + enum ProcessStatus { /** * In process, the processing result has not yet been returned. */ diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java index 2cbe7e3bcf0..0c67c0d1347 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java @@ -181,7 +181,7 @@ public void setMasterTimerQueueOffset(final long masterTimerQueueOffset) { this.masterTimerQueueOffset = masterTimerQueueOffset; } - public void updateDateVersion(long stateVersion) { + public void updateDataVersion(long stateVersion) { dataVersion.nextVersion(stateVersion); } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 8b995fbd709..9ff2544db17 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1900,7 +1900,7 @@ public void prepareTimerCheckPoint() { if (shouldRunningDequeue) { timerCheckpoint.setMasterTimerQueueOffset(commitQueueOffset); if (commitReadTimeMs != lastCommitReadTimeMs || commitQueueOffset != lastCommitQueueOffset) { - timerCheckpoint.updateDateVersion(messageStore.getStateMachineVersion()); + timerCheckpoint.updateDataVersion(messageStore.getStateMachineVersion()); lastCommitReadTimeMs = commitReadTimeMs; lastCommitQueueOffset = commitQueueOffset; }