diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index b92ec6a9..1c906e8a 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message { */ const std::string& getProducerName() const noexcept; + /** + * Get the source cluster from which the message was replicated. + * + * @return the optional pointer to the source cluster name if the message was replicated, the pointer is + * valid as the Message instance is alive + */ + std::optional getReplicatedFrom() const; + /** * @return the optional encryption context that is present when the message is encrypted, the pointer is * valid as the Message instance is alive diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h index 8aceca52..af22639c 100644 --- a/include/pulsar/c/message.h +++ b/include/pulsar/c/message.h @@ -230,6 +230,15 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, */ PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message); +/** + * Get the source cluster from which the message was replicated. + * + * The pointer points to internal storage owned by the message wrapper, so the caller should not free it. + * + * @return the source cluster name, or NULL if the message is not replicated + */ +PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t *message); + /** * Check if the message has a null value. * diff --git a/lib/Message.cc b/lib/Message.cc index f4e6d695..8bde683f 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const noexcept { return impl_->metadata.producer_name(); } +std::optional Message::getReplicatedFrom() const { + if (!impl_ || !impl_->metadata.has_replicated_from()) { + return std::nullopt; + } + return &impl_->metadata.replicated_from(); +} + std::optional Message::getEncryptionContext() const { if (!impl_ || !impl_->encryptionContext_.has_value()) { return std::nullopt; diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc index 51afa8e3..6d36309b 100644 --- a/lib/c/c_Message.cc +++ b/lib/c/c_Message.cc @@ -151,4 +151,9 @@ const char *pulsar_message_get_producer_name(pulsar_message_t *message) { return message->message.getProducerName().c_str(); } +const char *pulsar_message_get_replicated_from(pulsar_message_t *message) { + const auto replicatedFrom = message->message.getReplicatedFrom(); + return replicatedFrom ? replicatedFrom.value()->c_str() : nullptr; +} + int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); } diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc index 0ffcc417..7f1ae4d6 100644 --- a/tests/MessageTest.cc +++ b/tests/MessageTest.cc @@ -22,6 +22,7 @@ #include +#include "PulsarFriend.h" #include "lib/MessageImpl.h" using namespace pulsar; @@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) { ASSERT_TRUE(msg.getTopicName().empty()); } +TEST(MessageTest, testReplicationMetadataAccessors) { + auto msg = MessageBuilder().setContent("test").build(); + ASSERT_FALSE(msg.getReplicatedFrom().has_value()); + + PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1"); + + const auto replicatedFrom = msg.getReplicatedFrom(); + ASSERT_TRUE(replicatedFrom.has_value()); + ASSERT_EQ(*replicatedFrom.value(), "us-west1"); +} + TEST(MessageTest, testNullValueMessage) { { auto msg = MessageBuilder().setContent("test").build(); diff --git a/tests/c/c_MessageTest.cc b/tests/c/c_MessageTest.cc index a64a9901..7a2ee506 100644 --- a/tests/c/c_MessageTest.cc +++ b/tests/c/c_MessageTest.cc @@ -20,6 +20,8 @@ #include #include +#include "../PulsarFriend.h" + TEST(c_MessageTest, MessageCopy) { pulsar_message_t *from = pulsar_message_create(); pulsar_message_set_content(from, "hello", 5); @@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) { pulsar_message_free(from); pulsar_message_free(to); } + +TEST(c_MessageTest, ReplicationMetadataAccessors) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_content(message, "hello", 5); + message->message = message->builder.build(); + + ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message)); + + PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1"); + + ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message)); + + pulsar_message_free(message); +}