Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public boolean load() {
scanFileAndSetReadMode(LibC.MADV_RANDOM);
}
this.mappedFileQueue.checkSelf();
log.info("load commit log " + (result ? "OK" : "Failed"));
log.info("load commit log {}", result ? "OK" : "Failed");
return result;
}

Expand Down Expand Up @@ -362,22 +362,22 @@ else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
log.info("recover last 3 physics file over, last mapped file {}", mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
log.info("recover next physics file, {}", mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
}
log.info("recover physics file end, " + mappedFile.getFileName());
log.info("recover physics file end, {}", mappedFile.getFileName());
break;
}
}
Expand Down Expand Up @@ -448,7 +448,7 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
case BLANK_MAGIC_CODE:
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
log.warn("found a illegal magic code 0x{}", Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}

Expand Down Expand Up @@ -716,7 +716,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile, false)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
log.info("recover from this mapped file {}", mappedFile.getFileName());
break;
}
}
Expand Down Expand Up @@ -761,14 +761,14 @@ else if (size == 0) {
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
log.info("recover physics file over, last mapped file {}", mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
log.info("recover next physics file, {}", mappedFile.getFileName());
}
}
} else {
Expand All @@ -777,7 +777,7 @@ else if (size == 0) {
log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
}

log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
log.info("recover physics file end, {} pos={}", mappedFile.getFileName(), byteBuffer.position());
break;
}
}
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
}
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file1 error, topic: {} clientAddr: {}", msg.getTopic(), msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
Expand All @@ -1021,7 +1021,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file2 error, topic: {} clientAddr: {}", msg.getTopic(), msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
Expand Down Expand Up @@ -1371,7 +1371,7 @@ public boolean appendData(long startOffset, byte[] data, int dataStart, int data
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
log.error("appendData getLastMappedFile error " + startOffset);
log.error("appendData getLastMappedFile error {}", startOffset);
return false;
}

Expand Down Expand Up @@ -1441,7 +1441,7 @@ public String getServiceName() {

@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
CommitLog.log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

Expand Down Expand Up @@ -1469,16 +1469,16 @@ public void run() {
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
CommitLog.log.error("{} service has exception. ", this.getServiceName(), e);
}
}

boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
CommitLog.log.info("{} service shutdown, retry {} times {}", this.getServiceName(), i + 1, result ? "OK" : "Not OK");
}
CommitLog.log.info(this.getServiceName() + " service end");
CommitLog.log.info("{} service end", this.getServiceName());
}
}

Expand All @@ -1488,7 +1488,7 @@ class FlushRealTimeService extends FlushCommitLogService {

@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
CommitLog.log.info("{} service started", this.getServiceName());

while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
Expand Down Expand Up @@ -1532,7 +1532,7 @@ public void run() {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
CommitLog.log.warn("{} service has exception. ", this.getServiceName(), e);
this.printFlushProgress();
}
}
Expand All @@ -1541,12 +1541,12 @@ public void run() {
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
CommitLog.log.info("{} service shutdown, retry {} times {}", this.getServiceName(), i + 1, result ? "OK" : "Not OK");
}

this.printFlushProgress();

CommitLog.log.info(this.getServiceName() + " service end");
CommitLog.log.info("{} service end", this.getServiceName());
}

@Override
Expand Down Expand Up @@ -1673,14 +1673,14 @@ private void doCommit() {

@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
CommitLog.log.info("{} service started", this.getServiceName());

while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
CommitLog.log.warn("{} service has exception. ", this.getServiceName(), e);
}
}

Expand All @@ -1695,7 +1695,7 @@ public void run() {
this.swapRequests();
this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
CommitLog.log.info("{} service end", this.getServiceName());
}

@Override
Expand Down Expand Up @@ -1781,14 +1781,14 @@ private void doCommit() {
}

public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
CommitLog.log.info("{} service started", this.getServiceName());

while (!this.isStopped()) {
try {
this.waitForRunning(1);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
CommitLog.log.warn("{} service has exception. ", this.getServiceName(), e);
}
}

Expand All @@ -1806,7 +1806,7 @@ public void run() {

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
CommitLog.log.info("{} service end", this.getServiceName());
}

@Override
Expand Down Expand Up @@ -1880,7 +1880,7 @@ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer,

// Exceeds the maximum message
if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
log.warn("message size exceeded, msg total size: {}, maxMessageSize: {}", msgLen, this.messageStoreConfig.getMaxMessageSize());
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

Expand Down Expand Up @@ -2161,7 +2161,7 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
log.error("do groupcommit, wait for flush failed, topic: {} tags: {} client address: {}", messageExt.getTopic(), messageExt.getTags(), messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
Expand Down Expand Up @@ -2311,7 +2311,7 @@ public void run() {
long costTime = this.systemClock.now() - beginClockTimestamp;
log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has e: {}", e);
log.warn("{} service has e: ", this.getServiceName() , e);
}
}
log.info("{} service end", this.getServiceName());
Expand Down
Loading