Skip to content

Format task payload as JSON, when appropriate#15

Open
drauschenbach wants to merge 2 commits intoemo-crab:mainfrom
drauschenbach:task-payload-formatting
Open

Format task payload as JSON, when appropriate#15
drauschenbach wants to merge 2 commits intoemo-crab:mainfrom
drauschenbach:task-payload-formatting

Conversation

@drauschenbach
Copy link

@drauschenbach drauschenbach commented Feb 3, 2026

Enqueueing logic

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct DeleteAllForServiceInstance {
    pub service_instance_id: String,
    pub deleted_by_account_id: String,
}
pub static DELETE_ALL_FOR_SERVICE_INSTANCE: &str = "DeleteAllForServiceInstance";

let task_payload = DeleteAllForServiceInstance {
    service_instance_id: service_instance_id.clone(),
    deleted_by_account_id: common::consts::ADMIN_ACCOUNT_ID.to_string(),
};
let task = Task::new_with_json(DELETE_ALL_FOR_SERVICE_INSTANCE, &task_payload).unwrap();
match app_state.asynq_client.enqueue(task.clone()).await {
    Ok(_) => log::debug!(task:?; "Scheduled task"),
    Err(e) => log::error!(err:? = e, task:?; "Failed scheduling task"),
}

Before

[2026-02-03T14:26:13Z DEBUG REDACTED::task_handlers::my_task_handler] Scheduled task task=Task { task_type: "DeleteAllForServiceInstance", headers: {}, options: TaskOptions { task_id: None, queue: "default", max_retry: 3, timeout: None, deadline: None, unique_ttl: None, process_at: None, process_in: None, retention: None, group: None, retry_policy: None, rate_limit: None, group_grace_period: None }, payload: [123, 34, 115, 101, 114, 118, 105, 99, 101, 95, 105, 110, 115, 116, 97, 110, 99, 101, 95, 105, 100, 34, 58, 34, 100, 54, 49, 48, 98, 48, 108, 116, 99, 102, 118, 118, 52, 108, 56, 103, 49, 97, 109, 48, 34, 125] }

After

[2026-02-03T14:27:32Z DEBUG REDACTED::task_handlers::my_task_handler] Scheduled task task=Task { task_type: "DeleteAllForServiceInstance", headers: {}, options: TaskOptions { task_id: None, queue: "default", max_retry: 3, timeout: None, deadline: None, unique_ttl: None, process_at: None, process_in: None, retention: None, group: None, retry_policy: None, rate_limit: None, group_grace_period: None }, payload: Object {"service_instance_id": String("d610b0ltcfvv4l8g1am0"), "deleted_by_account_id": String("d4so1nbi3bi0gj4dv4f0")} }

@cn-kali-team
Copy link
Member

Thank you for your contribution. I understand what you mean. You would like to debug and print different formats based on different payload types.
Due to my addition of the new_with_ headers function, you mistakenly thought that headers were a payload format. However, the headers in the Task data structure are not a task enumeration type. They are additional metadata for the Task, used to store some task information unrelated to the payload.
So I suggest deleting the Header enumeration type.
Then use enumeration types

#[derive(Clone)]
pub enum Payload {
Bytes(Vec<u8>),
#[cfg(feature = "json")]
Json(Vec<u8>),
}
  • Here are some of my code suggestions
diff --git a/asynq/src/backend/pgdb/postgres_broker.rs b/asynq/src/backend/pgdb/postgres_broker.rs
index 4e6b73d..1b00dcf 100644
--- a/asynq/src/backend/pgdb/postgres_broker.rs
+++ b/asynq/src/backend/pgdb/postgres_broker.rs
@@ -208,7 +208,7 @@ impl PostgresBroker {
   pub(crate) fn task_to_message(&self, task: &Task) -> TaskMessage {
     TaskMessage {
       r#type: task.task_type.clone(),
-      payload: task.payload.clone(),
+      payload: task.payload.to_vec(),
       headers: task.headers.clone(),
       id: task
         .options
diff --git a/asynq/src/backend/rdb/redis_broker.rs b/asynq/src/backend/rdb/redis_broker.rs
index 081bed0..b7af587 100644
--- a/asynq/src/backend/rdb/redis_broker.rs
+++ b/asynq/src/backend/rdb/redis_broker.rs
@@ -237,7 +237,7 @@ impl RedisBroker {
   pub(crate) fn task_to_message(&self, task: &Task) -> TaskMessage {
     TaskMessage {
       r#type: task.task_type.clone(),
-      payload: task.payload.clone(),
+      payload: task.payload.to_vec(),
       headers: task.headers.clone(),
       id: task
         .options
diff --git a/asynq/src/backend/wsdb/ws_broker.rs b/asynq/src/backend/wsdb/ws_broker.rs
index dc9ca4d..0ae02d3 100644
--- a/asynq/src/backend/wsdb/ws_broker.rs
+++ b/asynq/src/backend/wsdb/ws_broker.rs
@@ -2,6 +2,7 @@
 //!
 //! A broker that connects to an asynq-server via WebSocket for cross-process communication.
 
+use std::ops::Deref;
 use crate::backend::wsdb::message::{
   ClientMessage, EnqueueRequest, ServerMessage, TaskDoneRequest, TaskMessageResponse,
 };
@@ -219,7 +220,7 @@ impl WebSocketBroker {
   pub(crate) fn task_to_enqueue_request(&self, task: &Task) -> EnqueueRequest {
     EnqueueRequest {
       task_type: task.task_type.clone(),
-      payload: BASE64_STANDARD.encode(&task.payload),
+      payload: BASE64_STANDARD.encode(task.payload.deref()),
       headers: task.headers.clone(),
       queue: if task.options.queue.is_empty() {
         None
diff --git a/asynq/src/components/aggregator.rs b/asynq/src/components/aggregator.rs
index 463e3a7..dab5df4 100644
--- a/asynq/src/components/aggregator.rs
+++ b/asynq/src/components/aggregator.rs
@@ -240,12 +240,11 @@ impl Aggregator {
                 // Convert TaskMessage to Task
                 let mut tasks = Vec::new();
                 for task_msg in task_messages {
-                  match Task::new_with_headers(
-                    &task_msg.r#type,
-                    &task_msg.payload,
-                    task_msg.headers,
-                  ) {
-                    Ok(task) => tasks.push(task),
+                  match Task::new(&task_msg.r#type, &task_msg.payload) {
+                    Ok(mut task) => {
+                      task = task.with_headers(task_msg.headers);
+                      tasks.push(task)
+                    }
                     Err(e) => {
                       tracing::warn!("Aggregator: failed to create task from message: {}", e);
                     }
diff --git a/asynq/src/components/processor.rs b/asynq/src/components/processor.rs
index 347ce1f..aa65b78 100644
--- a/asynq/src/components/processor.rs
+++ b/asynq/src/components/processor.rs
@@ -378,12 +378,11 @@ impl Processor {
 
                 // 创建任务,包含头部信息
                 // Create task with headers to preserve workflow path and other metadata
-                let mut task = match Task::new_with_headers(
-                  &task_msg.r#type,
-                  &task_msg.payload,
-                  task_msg.headers.clone(),
-                ) {
-                  Ok(task) => task,
+                let mut task = match Task::new(&task_msg.r#type, &task_msg.payload) {
+                  Ok(mut task) => {
+                    task = task.with_headers(task_msg.headers.clone());
+                    task
+                  }
                   Err(e) => {
                     tracing::error!("Failed to create task: {}", e);
                     // 发送 worker 完成事件(即使失败也需要发送)
diff --git a/asynq/src/task.rs b/asynq/src/task.rs
index d493599..1b06174 100644
--- a/asynq/src/task.rs
+++ b/asynq/src/task.rs
@@ -13,6 +13,7 @@ use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 use std::fmt::{Debug, Formatter};
+use std::ops::Deref;
 use std::sync::Arc;
 use std::time::Duration;
 use uuid::Uuid;
@@ -75,7 +76,46 @@ impl ResultWriter {
     &self.task_id
   }
 }
-
+#[derive(Clone)]
+pub enum Payload {
+  Bytes(Vec<u8>),
+  #[cfg(feature = "json")]
+  Json(Vec<u8>),
+}
+impl Debug for Payload {
+  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+    match self {
+      Payload::Bytes(bytes) => f.debug_tuple("Bytes").field(bytes).finish(),
+      #[cfg(feature = "json")]
+      Payload::Json(json) => match serde_json::from_slice::<serde_json::Value>(json) {
+        Ok(value) => f.debug_tuple("Json").field(&value).finish(),
+        Err(_e) => f.debug_tuple("Json").field(&json).finish(),
+      },
+    }
+  }
+}
+impl PartialEq for Payload {
+  fn eq(&self, other: &Self) -> bool {
+    match (self, other) {
+      (Payload::Bytes(a), Payload::Bytes(b)) => a == b,
+      #[cfg(feature = "json")]
+      (Payload::Json(a), Payload::Json(b)) => a == b,
+      #[cfg(feature = "json")]
+      _ => false,
+    }
+  }
+}
+impl Deref for Payload {
+  type Target = Vec<u8>;
+
+  fn deref(&self) -> &Self::Target {
+    match self {
+      Payload::Bytes(p) => p,
+      #[cfg(feature = "json")]
+      Payload::Json(p) => p,
+    }
+  }
+}
 /// 表示要执行的工作单元的任务
 /// Represents a task as a unit of work to be executed
 ///
@@ -97,7 +137,7 @@ pub struct Task {
   pub task_type: String,
   /// 任务负载数据
   /// Task payload data
-  pub payload: Vec<u8>,
+  pub payload: Payload,
   /// 任务头信息
   /// Task headers
   pub headers: HashMap<String, String>,
@@ -137,30 +177,40 @@ impl Task {
 
     Ok(Self {
       task_type: task_type.to_string(),
-      payload: payload.to_vec(),
+      payload: Payload::Bytes(payload.to_vec()),
       headers: Default::default(),
       options: TaskOptions::default(),
       result_writer: None,
       inspector: None,
     })
   }
-  pub fn new_with_headers<T: AsRef<str>>(
-    task_type: T,
-    payload: &[u8],
-    headers: HashMap<String, String>,
-  ) -> Result<Self> {
-    let mut task = Self::new(task_type, payload)?;
-    task.headers = headers;
-    Ok(task)
-  }
   #[cfg(feature = "json")]
   /// 使用 JSON 负载创建新任务
   /// Create a new task with JSON payload
   pub fn new_with_json<T: AsRef<str>, P: Serialize>(task_type: T, payload: &P) -> Result<Self> {
     let json_payload = serde_json::to_vec(payload)?;
-    Self::new(task_type, &json_payload)
-  }
+    let task_type = task_type.as_ref();
+    if task_type.trim().is_empty() {
+      return Err(Error::InvalidTaskType {
+        task_type: task_type.to_string(),
+      });
+    }
 
+    Ok(Self {
+      task_type: task_type.to_string(),
+      payload: Payload::Json(json_payload),
+      headers: Default::default(),
+      options: TaskOptions::default(),
+      result_writer: None,
+      inspector: None,
+    })
+  }
+  /// 设置任务头信息
+  /// Set task headers
+  pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
+    self.headers = headers;
+    self
+  }
   /// 设置任务选项
   /// Set task options
   pub fn with_options(mut self, options: TaskOptions) -> Self {
@@ -609,7 +659,7 @@ mod tests {
   fn test_task_creation() {
     let task = Task::new("test_task", b"test payload").unwrap();
     assert_eq!(task.task_type, "test_task");
-    assert_eq!(task.payload, b"test payload");
+    assert_eq!(task.payload, Payload::Bytes(b"test payload".to_vec()));
     assert_eq!(task.options.queue, DEFAULT_QUEUE_NAME);
   }
 
@@ -637,11 +687,7 @@ mod tests {
       count: 42,
     };
 
-    let task = Task::new(
-      "test_task",
-      &serde_json::to_vec(&payload).unwrap_or_default(),
-    )
-    .unwrap();
+    let task = Task::new_with_json("test_task", &payload).unwrap();
     let decoded: TestPayload = serde_json::from_slice(task.get_payload()).unwrap();
     assert_eq!(decoded, payload);
   }

There is another issue, when the data returned after task dequeue is only of Vectype, the Payload type format is lost during the process from TaskMessage to Task. However, the original asymq version https://github.com/hibiken/asynq/blob/master/internal/proto/asynq.proto There is no place to define the type of payload, it is not enumerable, only Vec.
Perhaps it is possible to add ContentType in the header, but this may cause other users to overwrite this key

@cn-kali-team
Copy link
Member

Are there any other updates?

@drauschenbach
Copy link
Author

I see a lot of changes in that diff that seem off-topic to this conversation. Aside from that signal noise, I think this PR will remain stalled on the fact that the contribution will only work for pre-submission tasks, and will not work for tasks re-hydrated from back-end state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants