Skip to content

Commit 5eb6d3e

Browse files
committed
feat: health gate before autopull + delivery backoff retry (closes #22)
1 parent b74c81c commit 5eb6d3e

5 files changed

Lines changed: 257 additions & 4 deletions

app/services/agent_auto_runner_service.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# frozen_string_literal: true
22

3+
require "net/http"
4+
require "uri"
5+
36
class AgentAutoRunnerService
47
NO_FAKE_IN_PROGRESS_GRACE = 10.minutes
58
ZOMBIE_STALE_AFTER = 30.minutes
@@ -37,6 +40,16 @@ def run!
3740
plan = selector.plan
3841
plan.skip_reasons.each { |reason, amount| stats[:queue_skip_reasons][reason] += amount.to_i }
3942

43+
unless gateway_ready?(user)
44+
cache_key = "openclaw_health_alert:#{user.id}"
45+
unless @cache.exist?(cache_key)
46+
@cache.write(cache_key, Time.current.to_i, expires_in: 30.minutes)
47+
end
48+
@logger.warn("[AgentAutoRunner] gateway not ready user=#{user.id}, skipping")
49+
stats[:queue_skip_reasons][:gateway_not_ready] += 1
50+
next
51+
end
52+
4053
woke = 0
4154
if plan.tasks.any?
4255
woke = wake_tasks!(user, plan.tasks)
@@ -63,6 +76,35 @@ def openclaw_configured?(user)
6376
user.openclaw_gateway_url.present? && (hooks_token.present? || user.openclaw_gateway_token.present?)
6477
end
6578

79+
def gateway_ready?(user)
80+
return true if Rails.env.test? && ENV["OPENCLAW_GATEWAY_HEALTHCHECK"] != "true"
81+
82+
base_url = gateway_base_url(user)
83+
return false if base_url.blank?
84+
85+
uri = URI.parse(base_url)
86+
uri.path = "/ready"
87+
uri.query = nil
88+
89+
http = Net::HTTP.new(uri.host, uri.port)
90+
http.use_ssl = uri.scheme == "https"
91+
http.open_timeout = 3
92+
http.read_timeout = 3
93+
94+
response = http.request(Net::HTTP::Get.new(uri.request_uri))
95+
code = response.code.to_i
96+
code >= 200 && code < 300
97+
rescue StandardError
98+
false
99+
end
100+
101+
def gateway_base_url(user)
102+
url = user.respond_to?(:openclaw_gateway_url) ? user.openclaw_gateway_url.to_s.strip : ""
103+
url = ENV["OPENCLAW_GATEWAY_URL"].to_s.strip if url.blank?
104+
url = "http://192.168.100.186:18789" if url.blank?
105+
url
106+
end
107+
66108
# Pipeline: process tasks that need pipeline advancement
67109
# Simplified: skip triage, go straight to context compilation + routing
68110
def process_pipeline_tasks!(user)

app/services/external_notification_service.rb

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# frozen_string_literal: true
22

3+
require "json"
34
require "net/http"
45
require "uri"
56

@@ -68,9 +69,24 @@ def send_telegram
6869
thread_id = telegram_thread_id
6970
params[:message_thread_id] = thread_id if thread_id.present?
7071

71-
Net::HTTP.post_form(uri, params)
72-
rescue StandardError => e
73-
Rails.logger.warn("[ExternalNotification] Telegram failed: #{e.message}")
72+
delays = [1, 5, 30]
73+
retries = 0
74+
75+
loop do
76+
begin
77+
response = Net::HTTP.post_form(uri, params)
78+
message_id = extract_telegram_message_id(response)
79+
return response if message_id.present?
80+
rescue StandardError
81+
end
82+
83+
break if retries >= delays.length
84+
Kernel.sleep(delays[retries])
85+
retries += 1
86+
end
87+
88+
Rails.logger.warn("[DeliveryBackoff] failed after 3 attempts")
89+
nil
7490
end
7591

7692
def send_webhook
@@ -122,4 +138,13 @@ def send_webhook
122138
rescue StandardError => e
123139
Rails.logger.warn("[ExternalNotification] Webhook failed: #{e.message}")
124140
end
141+
142+
def extract_telegram_message_id(response)
143+
return nil unless response.respond_to?(:body)
144+
145+
payload = JSON.parse(response.body.to_s) rescue nil
146+
return nil unless payload.is_a?(Hash)
147+
148+
payload.dig("result", "message_id") || payload["message_id"]
149+
end
125150
end
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require "minitest/mock"
5+
6+
class AgentAutoRunnerHealthGateTest < ActiveSupport::TestCase
7+
include ActiveSupport::Testing::TimeHelpers
8+
9+
class FakeWebhookService
10+
cattr_accessor :wakes, default: []
11+
12+
def initialize(_user)
13+
end
14+
15+
def notify_auto_pull_ready(task)
16+
self.class.wakes << task.id
17+
true
18+
end
19+
20+
def notify_auto_pull_ready_with_pipeline(task)
21+
self.class.wakes << task.id
22+
true
23+
end
24+
25+
def notify_runner_summary(_message)
26+
true
27+
end
28+
end
29+
30+
setup do
31+
Rails.cache.clear
32+
User.update_all(agent_auto_mode: false)
33+
@user = User.create!(
34+
email_address: "auto_runner_health_#{SecureRandom.hex(4)}@example.test",
35+
password: "password123",
36+
password_confirmation: "password123",
37+
agent_auto_mode: true,
38+
openclaw_gateway_url: "http://example.test",
39+
openclaw_gateway_token: "tok"
40+
)
41+
@board = Board.create!(user: @user, name: "Health Gate Board")
42+
@task = Task.create!(
43+
user: @user,
44+
board: @board,
45+
name: "Health Gate Task",
46+
status: :up_next,
47+
assigned_to_agent: true,
48+
blocked: false,
49+
pipeline_enabled: false,
50+
model: "gemini"
51+
)
52+
end
53+
54+
test "gateway 503 skips wakes and records skip reason" do
55+
FakeWebhookService.wakes = []
56+
cache = ActiveSupport::Cache::MemoryStore.new
57+
58+
with_healthcheck_enabled do
59+
stub_gateway_ready(503) do
60+
travel_to Time.find_zone!("America/Argentina/Buenos_Aires").local(2026, 2, 8, 23, 30, 0) do
61+
stats = AgentAutoRunnerService.new(openclaw_webhook_service: FakeWebhookService, cache: cache).run!
62+
assert_equal 0, FakeWebhookService.wakes.length
63+
assert_equal 1, stats[:queue_skip_reasons][:gateway_not_ready]
64+
end
65+
end
66+
end
67+
end
68+
69+
test "gateway 200 allows wakes" do
70+
FakeWebhookService.wakes = []
71+
cache = ActiveSupport::Cache::MemoryStore.new
72+
73+
with_healthcheck_enabled do
74+
stub_gateway_ready(200) do
75+
travel_to Time.find_zone!("America/Argentina/Buenos_Aires").local(2026, 2, 8, 23, 30, 0) do
76+
stats = AgentAutoRunnerService.new(openclaw_webhook_service: FakeWebhookService, cache: cache).run!
77+
assert_includes FakeWebhookService.wakes, @task.id
78+
assert stats[:tasks_woken] >= 1
79+
end
80+
end
81+
end
82+
end
83+
84+
private
85+
86+
def with_healthcheck_enabled
87+
original = ENV["OPENCLAW_GATEWAY_HEALTHCHECK"]
88+
ENV["OPENCLAW_GATEWAY_HEALTHCHECK"] = "true"
89+
yield
90+
ensure
91+
ENV["OPENCLAW_GATEWAY_HEALTHCHECK"] = original
92+
end
93+
94+
def stub_gateway_ready(code)
95+
response = Minitest::Mock.new
96+
response.expect(:code, code.to_s)
97+
98+
http = Minitest::Mock.new
99+
http.expect(:use_ssl=, nil, [false])
100+
http.expect(:open_timeout=, nil, [3])
101+
http.expect(:read_timeout=, nil, [3])
102+
http.expect(:request, response) { true }
103+
104+
Net::HTTP.stub(:new, ->(_host, _port) { http }) do
105+
yield
106+
end
107+
ensure
108+
response.verify
109+
http.verify
110+
end
111+
end
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class DeliveryBackoffTest < ActiveSupport::TestCase
6+
setup do
7+
@user = users(:default)
8+
@board = boards(:default)
9+
@task = @board.tasks.create!(
10+
user: @user,
11+
name: "Delivery Backoff Task",
12+
description: "Test delivery",
13+
status: :in_review,
14+
origin_chat_id: "12345"
15+
)
16+
ENV["CLAWTROL_TELEGRAM_BOT_TOKEN"] = "test_token"
17+
end
18+
19+
teardown do
20+
ENV.delete("CLAWTROL_TELEGRAM_BOT_TOKEN")
21+
end
22+
23+
test "missing message_id triggers retries" do
24+
responses = [
25+
build_response({ "ok" => true, "result" => {} }),
26+
build_response({ "ok" => true }),
27+
build_response({ "ok" => true, "result" => { "message_id" => 123 } })
28+
]
29+
call_count = 0
30+
sleeps = []
31+
32+
Net::HTTP.stub(:post_form, ->(_uri, _params) {
33+
response = responses[call_count]
34+
call_count += 1
35+
response
36+
}) do
37+
Kernel.stub(:sleep, ->(duration) { sleeps << duration }) do
38+
ExternalNotificationService.new(@task).send(:send_telegram)
39+
end
40+
end
41+
42+
assert_equal 3, call_count
43+
assert_equal [1, 5], sleeps
44+
end
45+
46+
test "logs after three failures and continues" do
47+
response = build_response({ "ok" => true })
48+
call_count = 0
49+
sleeps = []
50+
logged = []
51+
52+
Net::HTTP.stub(:post_form, ->(_uri, _params) {
53+
call_count += 1
54+
response
55+
}) do
56+
Kernel.stub(:sleep, ->(duration) { sleeps << duration }) do
57+
Rails.logger.stub(:warn, ->(message) { logged << message }) do
58+
assert_nothing_raised { ExternalNotificationService.new(@task).send(:send_telegram) }
59+
end
60+
end
61+
end
62+
63+
assert_equal 4, call_count
64+
assert_equal [1, 5, 30], sleeps
65+
assert_includes logged, "[DeliveryBackoff] failed after 3 attempts"
66+
end
67+
68+
private
69+
70+
def build_response(payload)
71+
Struct.new(:body).new(payload.to_json)
72+
end
73+
end

test/services/external_notification_service_test.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ class ExternalNotificationServiceTest < ActiveSupport::TestCase
121121
service = ExternalNotificationService.new(@task)
122122
assert service.send(:telegram_configured?)
123123
# send_telegram catches all exceptions
124-
assert_nothing_raised { service.send(:send_telegram) }
124+
Kernel.stub(:sleep, nil) do
125+
assert_nothing_raised { service.send(:send_telegram) }
126+
end
125127
else
126128
skip "Task does not have origin_chat_id column"
127129
end

0 commit comments

Comments
 (0)