Skip to content

Commit 5d07b51

Browse files
authored
feat: messenger performance optimisations to improve latency (#108)
1 parent c01bb1b commit 5d07b51

File tree

10 files changed

+293
-90
lines changed

10 files changed

+293
-90
lines changed

.env.example

+8-1
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,11 @@ BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2
6969
# Messenger environment variables
7070
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
7171
TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka"
72-
# ### Talos Messenger Env variables (end) #############################
72+
# ### Talos Messenger Env variables (end) #############################
73+
74+
#
75+
# ### Configs used for topic creation (start) #############################
76+
# KAFKA_CREATE_TOPIC_PARTITIONS=1
77+
# KAFKA_CREATE_TOPIC_REPLICATION_COUNT=3
78+
KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000, message.timestamp.type=LogAppendTime"
79+
# ### Configs used for topic creation (end) #############################

examples/certifier_kafka_pg/examples/kafka_create_topic.rs

+31-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,38 @@
1-
use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, KafkaDeployError, KafkaDeployStatus};
1+
use std::collections::HashMap;
2+
3+
use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, CreateTopicConfigs, KafkaDeployError, KafkaDeployStatus};
4+
use talos_common_utils::env_var_with_defaults;
5+
use talos_rdkafka_utils::kafka_config::KafkaConfig;
26

37
#[tokio::main]
48
async fn main() -> Result<(), KafkaDeployError> {
5-
println!("deploying kafka...");
9+
println!("Creating kafka topic...");
10+
11+
let kafka_config = KafkaConfig::from_env(None);
12+
13+
let replication_factor = env_var_with_defaults!("KAFKA_CREATE_TOPIC_REPLICATION_COUNT", Option::<i32>, 3);
14+
let num_partitions = env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", Option::<i32>, 1);
15+
16+
// eg: KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000,"
17+
let config_option = env_var_with_defaults!("KAFKA_CREATE_TOPIC_CONFIGS", Option::<String>);
18+
19+
let mut config: HashMap<&str, &str> = HashMap::new();
20+
21+
let config_string = config_option.unwrap_or("".to_owned());
22+
config_string.trim().split(',').for_each(|c| {
23+
if let Some((k, v)) = c.trim().split_once('=') {
24+
config.insert(k, v);
25+
}
26+
});
27+
28+
let topic_config = CreateTopicConfigs {
29+
topic: kafka_config.topic.clone(),
30+
config,
31+
replication_factor,
32+
num_partitions,
33+
};
634

7-
let status = create_topic().await?;
35+
let status = create_topic(&kafka_config, topic_config).await?;
836

937
match status {
1038
KafkaDeployStatus::TopicExists => {

examples/messenger_using_kafka/examples/messenger_using_kafka.rs

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async fn main() {
3838
kafka_config,
3939
allowed_actions,
4040
channel_buffers: None,
41+
commit_size: Some(2_000),
4142
};
4243

4344
messenger_with_kafka(config).await.unwrap();

packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs

+32-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::{collections::HashMap, time::Duration};
22

33
use rdkafka::{
44
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
@@ -7,7 +7,7 @@ use rdkafka::{
77
error::KafkaError,
88
types::RDKafkaErrorCode,
99
};
10-
use talos_common_utils::env_var_with_defaults;
10+
1111
use talos_rdkafka_utils::kafka_config::KafkaConfig;
1212
use thiserror::Error as ThisError;
1313

@@ -24,26 +24,45 @@ pub enum KafkaDeployError {
2424
KafkaError(#[from] KafkaError),
2525
}
2626

27-
pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {
28-
let kafka_config = KafkaConfig::from_env(None);
29-
println!("kafka configs received from env... {kafka_config:#?}");
27+
#[derive(Debug, Clone)]
28+
pub struct CreateTopicConfigs<'a> {
29+
/// topic to create.
30+
pub topic: String,
31+
/// Topic specific configs.
32+
///
33+
/// see: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html
34+
pub config: HashMap<&'a str, &'a str>,
35+
/// Replication count for partitions in topic. Defaults to 3.
36+
pub replication_factor: Option<i32>,
37+
/// Number of paritions for the topic. Defaults to 1.
38+
pub num_partitions: Option<i32>,
39+
}
40+
41+
const DEFAULT_REPLICATION_FACTOR: i32 = 3;
42+
const DEFAULT_NUM_PARTITIONS: i32 = 3;
43+
44+
pub async fn create_topic(kafka_config: &KafkaConfig, topic_configs: CreateTopicConfigs<'_>) -> Result<KafkaDeployStatus, KafkaDeployError> {
45+
println!("kafka brokers = {:?} and usename = {}", kafka_config.brokers, kafka_config.username);
46+
println!("topic configs received from env = {topic_configs:#?}");
3047
let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?;
3148

32-
let kafka_certification_topic = kafka_config.topic.to_string();
33-
let timeout = Duration::from_secs(1);
49+
let timeout = Duration::from_secs(5);
3450
let metadata = consumer
35-
.fetch_metadata(Some(&kafka_certification_topic), timeout)
51+
.fetch_metadata(Some(&topic_configs.topic), timeout)
3652
.expect("Fetching topic metadata failed");
3753

3854
if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() {
3955
Ok(KafkaDeployStatus::TopicExists)
4056
} else {
4157
println!("Topic does not exist, creating...");
58+
59+
let config: Vec<(&str, &str)> = topic_configs.config.into_iter().collect();
60+
4261
let topic = NewTopic {
43-
name: &kafka_certification_topic,
44-
num_partitions: env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", i32, 1),
45-
replication: TopicReplication::Fixed(1),
46-
config: vec![("message.timestamp.type", "LogAppendTime")],
62+
name: &topic_configs.topic,
63+
num_partitions: topic_configs.num_partitions.unwrap_or(DEFAULT_NUM_PARTITIONS),
64+
replication: TopicReplication::Fixed(topic_configs.replication_factor.unwrap_or(DEFAULT_REPLICATION_FACTOR)),
65+
config,
4766
};
4867

4968
let opts = AdminOptions::new().operation_timeout(Some(timeout));
@@ -52,9 +71,7 @@ pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {
5271

5372
let results = admin.create_topics(&[topic], &opts).await?;
5473

55-
results[0]
56-
.as_ref()
57-
.map_err(|e| KafkaDeployError::TopicCreation(kafka_certification_topic, e.1))?;
74+
results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_configs.topic, e.1))?;
5875

5976
Ok(KafkaDeployStatus::TopicCreated)
6077
}

packages/talos_messenger_actions/src/kafka/context.rs

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ impl ProducerContext for MessengerProducerContext {
2929
match result {
3030
Ok(msg) => {
3131
info!("Message {:?} {:?}", msg.key(), msg.offset());
32+
3233
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
3334
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
3435
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");

packages/talos_messenger_actions/src/kafka/service.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use std::sync::Arc;
33
use async_trait::async_trait;
44
use futures_util::future::join_all;
55
use log::{debug, error, info};
6+
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
67
use tokio::sync::mpsc;
78

89
use talos_messenger_core::{
910
core::{MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService},
1011
errors::MessengerServiceResult,
12+
suffix::MessengerStateTransitionTimestamps,
1113
utlis::get_actions_deserialised,
1214
};
1315

@@ -49,7 +51,10 @@ where
4951

5052
let publish_vec = actions.into_iter().map(|action| {
5153
let publisher = self.publisher.clone();
52-
let headers = headers_cloned.clone();
54+
let mut headers = headers_cloned.clone();
55+
let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap();
56+
57+
headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp);
5358
async move {
5459
publisher.send(version, action, headers, total_len ).await;
5560
}

packages/talos_messenger_actions/src/messenger_with_kafka.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer;
77
use talos_messenger_core::{
88
core::{MessengerPublisher, PublishActionType},
99
errors::MessengerServiceResult,
10-
services::MessengerInboundService,
10+
services::{MessengerInboundService, MessengerInboundServiceConfig},
1111
suffix::MessengerCandidate,
1212
talos_messenger_service::TalosMessengerService,
1313
};
@@ -93,6 +93,10 @@ pub struct Configuration {
9393
pub allowed_actions: HashMap<String, Vec<String>>,
9494
/// Channel buffer size for the internal channels between threads
9595
pub channel_buffers: Option<ChannelBuffers>,
96+
/// Commit size to decide how often the certifier topic can be committed by the consumer.
97+
/// The more often the commit is done has inverse impact on the latency.
98+
/// Defaults to 5_000.
99+
pub commit_size: Option<u32>,
96100
}
97101

98102
pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
@@ -112,14 +116,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu
112116

113117
// START - Inbound service
114118
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());
115-
116-
let inbound_service = MessengerInboundService {
117-
message_receiver: kafka_consumer,
118-
tx_actions_channel,
119-
rx_feedback_channel,
120-
suffix,
121-
allowed_actions: config.allowed_actions,
122-
};
119+
let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size);
120+
let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config);
123121
// END - Inbound service
124122

125123
// START - Publish service

0 commit comments

Comments
 (0)