Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getProducerName() const noexcept;

/**
* Get the source cluster from which the message was replicated.
*
* @return the optional pointer to the source cluster name if the message was replicated, the pointer is
* valid as the Message instance is alive
*/
std::optional<const std::string*> getReplicatedFrom() const;
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

/**
* @return the optional encryption context that is present when the message is encrypted, the pointer is
* valid as the Message instance is alive
Expand Down
9 changes: 9 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Get the source cluster from which the message was replicated.
*
* The pointer points to internal storage owned by the message wrapper, so the caller should not free it.
*
* @return the source cluster name, or NULL if the message is not replicated
*/
PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
Expand Down
7 changes: 7 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const noexcept {
return impl_->metadata.producer_name();
}

std::optional<const std::string*> Message::getReplicatedFrom() const {
if (!impl_ || !impl_->metadata.has_replicated_from()) {
return std::nullopt;
}
return &impl_->metadata.replicated_from();
}
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
if (!impl_ || !impl_->encryptionContext_.has_value()) {
return std::nullopt;
Expand Down
5 changes: 5 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,9 @@ const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

const char *pulsar_message_get_replicated_from(pulsar_message_t *message) {
const auto replicatedFrom = message->message.getReplicatedFrom();
return replicatedFrom ? replicatedFrom.value()->c_str() : nullptr;
}
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
12 changes: 12 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <string>

#include "PulsarFriend.h"
#include "lib/MessageImpl.h"

using namespace pulsar;
Expand Down Expand Up @@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testReplicationMetadataAccessors) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.getReplicatedFrom().has_value());

PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1");

const auto replicatedFrom = msg.getReplicatedFrom();
ASSERT_TRUE(replicatedFrom.has_value());
ASSERT_EQ(*replicatedFrom.value(), "us-west1");
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
Expand Down
16 changes: 16 additions & 0 deletions tests/c/c_MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <lib/c/c_structs.h>
#include <pulsar/c/message.h>

#include "../PulsarFriend.h"

TEST(c_MessageTest, MessageCopy) {
pulsar_message_t *from = pulsar_message_create();
pulsar_message_set_content(from, "hello", 5);
Expand All @@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) {
pulsar_message_free(from);
pulsar_message_free(to);
}

TEST(c_MessageTest, ReplicationMetadataAccessors) {
pulsar_message_t *message = pulsar_message_create();
pulsar_message_set_content(message, "hello", 5);
message->message = message->builder.build();

ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message));

PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1");

ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message));

pulsar_message_free(message);
}
Loading