diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 1060cd5c..dbb5c03a 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -488,9 +488,11 @@ optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid << ", messageId: " << messageId << ") of " << payload.readableBytes() << " bytes"); - Lock lock(chunkProcessMutex_); - + // For non-last chunks, increase available permits immediately since they don't occupy receiver queue. + if (chunkId != metadata.num_chunks_from_msg() - 1) { + increaseAvailablePermits(cnx); + } // Lazy task scheduling to expire incomplete chunk message bool expected = false; if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && @@ -498,9 +500,58 @@ optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay triggerCheckExpiredChunkedTimer(); } + // Part 1: chunkId == 0, this is the first chunk of a chunked message. + // If a previous incomplete context with the same uuid exists, it means either: + // a) Message redeliver: the first chunk's messageId matches one of the cached chunk messageIds. + // In this case, the old context is simply removed and a new context is created to restart + // assembling from scratch. No ack is needed since the old chunks will be redelivered. + // b) Corrupted chunk message: the first chunk's messageId does NOT match any cached chunk messageId, + // meaning a new producer sent a message with the same uuid. In this case, ack the old cached + // chunks to avoid ack holes, then remove the old context and create a new one. + // After handling the old context, check maxPendingChunkedMessage limit and create a new context. auto it = chunkedMessageCache_.find(uuid); - - if (chunkId == 0 && it == chunkedMessageCache_.end()) { + if (chunkId == 0) { + // Handle ack hole when receiving duplicated first chunk. + // For example (message redeliver): + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3 + // For example (corrupted chunk message): + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 + if (it != chunkedMessageCache_.end()) { + auto& existingCtx = it->second; + bool isCorruptedChunkMessage = true; + for (const MessageId& cachedMsgId : existingCtx.getChunkedMessageIds()) { + if (cachedMsgId.ledgerId() == messageId.ledgerId() && + cachedMsgId.entryId() == messageId.entryId()) { + isCorruptedChunkMessage = false; + break; + } + } + if (isCorruptedChunkMessage) { + for (const MessageId& cachedMsgId : existingCtx.getChunkedMessageIds()) { + LOG_INFO("Acking corrupted chunk message to avoid ack hole, uuid: " + << uuid << ", messageId: " << cachedMsgId); + acknowledgeAsync(cachedMsgId, [uuid, cachedMsgId](Result result) { + if (result != ResultOk) { + LOG_ERROR("Failed to acknowledge corrupted chunk, uuid: " + << uuid << ", messageId: " << cachedMsgId); + } + }); + } + } + LOG_WARN("Received a duplicated first chunk (uuid: " + << uuid << ", messageId: " << messageId + << "). Remove previous chunk context and restart assembling"); + chunkedMessageCache_.remove(uuid); + it = chunkedMessageCache_.end(); + } if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { chunkedMessageCache_.removeOldestValues( chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, @@ -512,47 +563,117 @@ optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay } it = chunkedMessageCache_.putIfAbsent( uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); + it->second.appendChunk(messageId, payload); + lock.unlock(); + return {}; } - auto& chunkedMsgCtx = it->second; - if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { + // Part 2: chunkId != 0 but chunk context not found in cache. + // This happens when the first chunk was not received (e.g., consumer used seek() or started + // consuming from a specific message position that falls in the middle of a chunked message, + // or the context was evicted due to maxPendingChunkedMessage limit). + // In this case, the chunk message cannot be assembled, so just discard it. + if (it == chunkedMessageCache_.end()) { auto startMessageId = getStartMessageId(); if (!config_.isStartMessageIdInclusive() && startMessageId && startMessageId->ledgerId() == messageId.ledgerId() && startMessageId->entryId() == messageId.entryId()) { - // When the start message id is not inclusive, the last chunk of the previous chunked message will - // be delivered, which is expected and we only need to filter it out. - chunkedMessageCache_.remove(uuid); LOG_INFO("Filtered the chunked message before the start message id (uuid: " << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); - } else if (it == chunkedMessageCache_.end()) { + } else { LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); - } else { - LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: " - << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); - chunkedMessageCache_.remove(uuid); + } + // If this is the last chunk, its permit was not returned at the entry of processMessageChunk, + // so we need to return it here to avoid permit leak. + if (chunkId == metadata.num_chunks_from_msg() - 1) { + increaseAvailablePermits(cnx); } lock.unlock(); - increaseAvailablePermits(cnx); - trackMessage(messageId); return {}; } + // Part 3: chunkId does not match the expected next chunk ID (out-of-order). + // Two sub-cases: + // a) chunkId <= lastChunkedMessageId: duplicated chunk caused by redeliver or corruption. + // Filter and ack the duplicated chunk if it's corrupted, then discard it. + // b) chunkId > lastChunkedMessageId + 1: gap detected, the chunked message is corrupted. + // Remove the context and ack the current chunk if it has expired. + auto& chunkedMsgCtx = it->second; + if (!chunkedMsgCtx.validateChunkId(chunkId)) { + const int lastChunkedMessageId = static_cast(chunkedMsgCtx.getChunkedMessageIds().size()) - 1; + // For example (duplicated chunk): + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 + // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 + if (chunkId <= lastChunkedMessageId) { + bool isCorruptedChunk = true; + for (const MessageId& cachedMsgId : chunkedMsgCtx.getChunkedMessageIds()) { + if (cachedMsgId.ledgerId() == messageId.ledgerId() && + cachedMsgId.entryId() == messageId.entryId()) { + isCorruptedChunk = false; + break; + } + } + LOG_WARN("Received a duplicated chunk message (uuid: " + << uuid << " chunkId: " << chunkId << ", lastChunkedMessageId: " << lastChunkedMessageId + << ", messageId: " << messageId << ")"); + lock.unlock(); + if (isCorruptedChunk) { + LOG_INFO("Acking corrupted duplicated chunk to avoid ack hole, uuid: " + << uuid << ", messageId: " << messageId); + acknowledgeAsync(messageId, [uuid, messageId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge duplicated chunk, uuid: " << uuid << ", messageId: " + << messageId); + } + }); + } + return {}; + } + // chunkId > lastChunkedMessageId + 1, the chunked message is corrupted. + LOG_WARN("Received unexpected chunk (uuid: " << uuid << " chunkId: " << chunkId + << ", lastChunkedMessageId: " << lastChunkedMessageId + << ", messageId: " << messageId << ")"); + chunkedMessageCache_.remove(uuid); + // If this is the last chunk, its permit was not returned at the entry of processMessageChunk, + // so we need to return it here to avoid permit leak. + if (chunkId == metadata.num_chunks_from_msg() - 1) { + increaseAvailablePermits(cnx); + } + lock.unlock(); + if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && + TimeUtils::currentTimeMillis() > + static_cast(metadata.publish_time()) + expireTimeOfIncompleteChunkedMessageMs_) { + LOG_INFO("Acking corrupted gap chunk to avoid ack hole, uuid: " << uuid + << ", messageId: " << messageId); + acknowledgeAsync(messageId, [uuid, messageId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge gap chunk, uuid: " << uuid + << ", messageId: " << messageId); + } + }); + } + return {}; + } + + // Part 4: chunkId matches the expected next chunk ID, append chunk payload. + // If all chunks have been received, assemble the full message, build a ChunkMessageId + // containing all individual chunk message IDs, and return the uncompressed payload. chunkedMsgCtx.appendChunk(messageId, payload); if (!chunkedMsgCtx.isCompleted()) { lock.unlock(); - increaseAvailablePermits(cnx); return {}; } - messageId = std::make_shared(chunkedMsgCtx.moveChunkedMessageIds())->build(); - LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); - auto wholePayload = chunkedMsgCtx.getBuffer(); chunkedMessageCache_.remove(uuid); + lock.unlock(); if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) { return wholePayload; } else { diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index f68dd3d8..2421d402 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -96,6 +96,19 @@ class MessageChunkingTest : public ::testing::TestWithParam { std::string MessageChunkingTest::largeMessage = createLargeMessage(); +// Helper function: send a single chunk message +static void sendSingleChunk(Producer& producer, const std::string& uuid, int chunkId, int totalChunks) { + std::string content = "chunk-" + uuid + "-" + std::to_string(chunkId) + "|"; + auto msg = MessageBuilder().setContent(content).build(); + auto& metadata = PulsarFriend::getMessageMetadata(msg); + metadata.set_num_chunks_from_msg(totalChunks); + metadata.set_chunk_id(chunkId); + metadata.set_uuid(uuid); + metadata.set_total_chunk_msg_size(100); + MessageId messageId; + ASSERT_EQ(ResultOk, producer.send(msg, messageId)); +} + TEST_F(MessageChunkingTest, testInvalidConfig) { Client client(lookupUrl); ProducerConfiguration conf; @@ -182,13 +195,7 @@ TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { Producer producer; createProducer(topic, producer); - auto msg = MessageBuilder().setContent("test-data").build(); - auto& metadata = PulsarFriend::getMessageMetadata(msg); - metadata.set_num_chunks_from_msg(2); - metadata.set_chunk_id(0); - metadata.set_total_chunk_msg_size(100); - - producer.send(msg); + sendSingleChunk(producer, "expire-test", 0, 2); auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); @@ -220,32 +227,9 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { Producer producer; createProducer(topic, producer); - auto msg = MessageBuilder().setContent("chunk-0-0|").build(); - auto& metadata = PulsarFriend::getMessageMetadata(msg); - metadata.set_num_chunks_from_msg(2); - metadata.set_chunk_id(0); - metadata.set_uuid("0"); - metadata.set_total_chunk_msg_size(100); - - producer.send(msg); - - auto msg2 = MessageBuilder().setContent("chunk-1-0|").build(); - auto& metadata2 = PulsarFriend::getMessageMetadata(msg2); - metadata2.set_num_chunks_from_msg(2); - metadata2.set_uuid("1"); - metadata2.set_chunk_id(0); - metadata2.set_total_chunk_msg_size(100); - - producer.send(msg2); - - auto msg3 = MessageBuilder().setContent("chunk-1-1|").build(); - auto& metadata3 = PulsarFriend::getMessageMetadata(msg3); - metadata3.set_num_chunks_from_msg(2); - metadata3.set_uuid("1"); - metadata3.set_chunk_id(1); - metadata3.set_total_chunk_msg_size(100); - - producer.send(msg3); + sendSingleChunk(producer, "0", 0, 2); + sendSingleChunk(producer, "1", 0, 2); + sendSingleChunk(producer, "1", 1, 2); Message receivedMsg; ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000)); @@ -262,14 +246,7 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { consumer.acknowledge(receivedMsg2); consumer.redeliverUnacknowledgedMessages(); - auto msg4 = MessageBuilder().setContent("chunk-0-1|").build(); - auto& metadata4 = PulsarFriend::getMessageMetadata(msg4); - metadata4.set_num_chunks_from_msg(2); - metadata4.set_uuid("0"); - metadata4.set_chunk_id(1); - metadata4.set_total_chunk_msg_size(100); - - producer.send(msg4); + sendSingleChunk(producer, "0", 1, 2); // This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore. Message receivedMsg3; @@ -356,6 +333,127 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { ASSERT_EQ(firstChunkMsgId.partition(), 3); } +// Aligned with Java testResendChunkMessagesWithoutAckHole +TEST_P(MessageChunkingTest, testResendChunkMessagesWithoutAckHole) { + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = + "MessageChunkingTest-testResendChunkMessagesWithoutAckHole-" + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setMaxPendingChunkedMessage(10); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + consumerConf.setBrokerConsumerStatsCacheTimeInMs(1000); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + // Send chunk sequence: uuid="0" chunkId=0 -> uuid="0" chunkId=0 (resend) -> uuid="0" chunkId=1 + sendSingleChunk(producer, "0", 0, 2); + sendSingleChunk(producer, "0", 0, 2); // Resend the first chunk + sendSingleChunk(producer, "0", 1, 2); + + Message receivedMsg; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-0-0|chunk-0-1|"); + consumer.acknowledge(receivedMsg); + + // Verify no ack hole: backlog should be 0 after ack + BrokerConsumerStats consumerStats; + waitUntil( + std::chrono::seconds(10), + [&] { + return consumer.getBrokerConsumerStats(consumerStats) == ResultOk && + consumerStats.getMsgBacklog() == 0; + }, + 1000); + ASSERT_EQ(consumerStats.getMsgBacklog(), 0); + + producer.close(); + consumer.close(); +} + +// Aligned with Java testResendChunkMessages +TEST_P(MessageChunkingTest, testResendChunkMessages) { + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = "MessageChunkingTest-testResendChunkMessages-" + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setMaxPendingChunkedMessage(10); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + // Send interleaved chunk sequence with multiple uuid resends + sendSingleChunk(producer, "0", 0, 2); + sendSingleChunk(producer, "0", 0, 2); // Resend first chunk of uuid="0" + sendSingleChunk(producer, "1", 0, 3); // Interleave uuid="1" + sendSingleChunk(producer, "1", 1, 3); + sendSingleChunk(producer, "1", 0, 3); // Resend first chunk of uuid="1" + sendSingleChunk(producer, "0", 1, 2); // Complete uuid="0" + + Message receivedMsg; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-0-0|chunk-0-1|"); + consumer.acknowledge(receivedMsg); + + // Continue sending to complete uuid="1" + sendSingleChunk(producer, "1", 1, 3); + sendSingleChunk(producer, "1", 2, 3); + + Message receivedMsg2; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 5000)); + ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|chunk-1-2|"); + consumer.acknowledge(receivedMsg2); + + producer.close(); + consumer.close(); +} + +// Aligned with Go TestResendChunkWithAckHoleMessages +TEST_P(MessageChunkingTest, testResendChunkWithAckHoleMessages) { + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = + "MessageChunkingTest-testResendChunkWithAckHoleMessages-" + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setMaxPendingChunkedMessage(10); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + // Scenario 1: middle chunk resend, verify message assembles correctly (duplicated chunks are filtered) + sendSingleChunk(producer, "0", 0, 4); + sendSingleChunk(producer, "0", 1, 4); + sendSingleChunk(producer, "0", 2, 4); + sendSingleChunk(producer, "0", 1, 4); // Resend chunkId=1 + sendSingleChunk(producer, "0", 2, 4); // Resend chunkId=2 + sendSingleChunk(producer, "0", 3, 4); // Complete + + Message receivedMsg; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-0-0|chunk-0-1|chunk-0-2|chunk-0-3|"); + consumer.acknowledge(receivedMsg); + + // Scenario 2: chunk gap (chunkId jump), verify consumer cannot receive message (context is cleaned up) + sendSingleChunk(producer, "1", 0, 4); + sendSingleChunk(producer, "1", 1, 4); + sendSingleChunk(producer, "1", 4, 4); // Gap: skipped chunkId=2 and 3 + + Message receivedMsg2; + ASSERT_NE(ResultOk, consumer.receive(receivedMsg2, 3000)); + + producer.close(); + consumer.close(); +} + // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,