Skip to content

Commit d860279

Browse files
authored
ref(taskworker): Add a BatchPushTaskWorker to cli (#117429)
Depends on getsentry/taskbroker#695 being released. Add an option to use the BatchPushTaskWorker.
1 parent 5ade625 commit d860279

3 files changed

Lines changed: 26 additions & 5 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ dependencies = [
107107
"statsd>=3.3.0",
108108
"structlog>=22.1.0",
109109
"symbolic>=13.1.1",
110-
"taskbroker-client>=0.18.6",
110+
"taskbroker-client>=0.19.1",
111111
"tiktoken>=0.8.0",
112112
"tokenizers>=0.22.0",
113113
"tldextract>=5.1.2",

src/sentry/runner/commands/run.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ def taskworker_scheduler(redis_cluster: str, **options: Any) -> None:
138138
@click.option(
139139
"--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True
140140
)
141+
@click.option(
142+
"--batch-push-mode", help="Whether to run in BATCH PUSH mode.", default=False, is_flag=True
143+
)
141144
@click.option(
142145
"--rpc-host",
143146
help="The hostname and port for the taskbroker gRPC server. When using num-brokers the hostname will be appended with `-{i}` to connect to individual brokers.",
@@ -213,6 +216,7 @@ def taskworker(**options: Any) -> None:
213216

214217
def run_taskworker(
215218
push_mode: bool,
219+
batch_push_mode: bool,
216220
worker_rpc_port: int,
217221
rpc_host: str,
218222
num_brokers: int | None,
@@ -232,7 +236,7 @@ def run_taskworker(
232236
"""
233237
taskworker factory that can be reloaded
234238
"""
235-
from taskbroker_client.worker import PushTaskWorker, TaskWorker
239+
from taskbroker_client.worker import BatchPushTaskWorker, PushTaskWorker, TaskWorker
236240
from taskbroker_client.worker.client import make_broker_hosts
237241

238242
with managed_bgtasks(role="taskworker"):
@@ -252,6 +256,23 @@ def run_taskworker(
252256
health_check_sec_per_touch=health_check_sec_per_touch,
253257
grpc_port=worker_rpc_port,
254258
)
259+
elif batch_push_mode:
260+
worker = BatchPushTaskWorker(
261+
app_module="sentry.taskworker.bootstrap:app",
262+
broker_service=rpc_host,
263+
max_child_task_count=max_child_task_count,
264+
namespace=namespace,
265+
concurrency=concurrency,
266+
child_tasks_queue_maxsize=child_tasks_queue_maxsize,
267+
result_queue_maxsize=result_queue_maxsize,
268+
rebalance_after=rebalance_after,
269+
processing_pool_name=processing_pool_name,
270+
pod_name=pod_name,
271+
health_check_file_path=health_check_file_path,
272+
health_check_sec_per_touch=health_check_sec_per_touch,
273+
grpc_port=worker_rpc_port,
274+
update_in_batches=True,
275+
)
255276
else:
256277
worker = TaskWorker(
257278
app_module="sentry.taskworker.bootstrap:app",

uv.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)