Skip to content

Commit 6f6534c

Browse files
committed
feat: add stats to understand processing speeds
1 parent 0af30fe commit 6f6534c

6 files changed

Lines changed: 70 additions & 46 deletions

File tree

src/source/publishing/influx_manager.cr

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ module PlaceOS::Source
1616

1717
delegate start, stop, to: publisher
1818

19+
def stats : Hash(String, UInt64)
20+
{"influx" => publisher.processed}
21+
end
22+
1923
def initialize(
2024
@influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"),
2125
@influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"),

src/source/publishing/mqtt_broker_manager.cr

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ module PlaceOS::Source
2626
end
2727
end
2828

29+
def stats : Hash(String, UInt64)
30+
hash = {} of String => UInt64
31+
read_publishers do |publishers|
32+
publishers.values.each do |publisher|
33+
hash["MQTT: #{publisher.broker.name}"] = publisher.processed
34+
end
35+
end
36+
hash
37+
end
38+
2939
def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result
3040
# Don't recreate the publisher if only "safe" attributes have changed
3141
case action

src/source/publishing/mqtt_publisher.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module PlaceOS::Source
3030
Event.new(value, timestamp).to_json
3131
end
3232

33-
private getter broker : PlaceOS::Model::Broker
33+
getter broker : PlaceOS::Model::Broker
3434
private getter broker_lock : RWLock = RWLock.new
3535
protected getter client : ::MQTT::V3::Client
3636

src/source/publishing/publisher.cr

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,26 @@ module PlaceOS::Source
1111
timestamp : Time
1212
)
1313

14-
getter message_queue : Channel(Message) = Channel(Message).new
14+
getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE)
15+
getter processed : UInt64 = 0_u64
1516

1617
abstract def publish(message : Message)
1718

1819
def start
19-
consume_messages
20+
spawn { consume_messages }
2021
end
2122

2223
def stop
2324
message_queue.close
2425
end
2526

2627
private def consume_messages
27-
spawn do
28-
while message = message_queue.receive?
29-
begin
30-
publish(message)
31-
rescue error
32-
Log.warn(exception: error) { "publishing message: #{message}" }
33-
end
28+
while message = message_queue.receive?
29+
begin
30+
publish(message)
31+
@processed += 1_u64
32+
rescue error
33+
Log.warn(exception: error) { "publishing message: #{message}" }
3434
end
3535
end
3636
end

src/source/publishing/publisher_manager.cr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@ module PlaceOS::Source
55
abstract def broadcast(message : Publisher::Message)
66
abstract def start
77
abstract def stop
8+
9+
abstract def stats : Hash(String, UInt64)
810
end
911
end

src/source/status_events.cr

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module PlaceOS::Source
1414
STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
1515
MAX_CONTAINER_SIZE = 40_000
1616
BATCH_SIZE = 100
17-
PROCESSING_INTERVAL = 100.milliseconds
17+
PROCESSING_INTERVAL = 40.milliseconds
1818
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8
1919

2020
private getter! redis : Redis
@@ -80,7 +80,9 @@ module PlaceOS::Source
8080

8181
# process
8282
break if modules.empty?
83-
yield modules
83+
modules.each do |mod|
84+
yield mod
85+
end
8486
break if modules.size < batch_size
8587

8688
last_created_at = modules.last.created_at
@@ -92,26 +94,23 @@ module PlaceOS::Source
9294
mods_mapped = 0_u64
9395
status_updated = 0_u64
9496
pattern = "initial_sync"
95-
paginate_modules do |modules|
96-
modules.each do |mod|
97-
next unless mod
98-
mods_mapped += 1_u64
99-
module_id = mod.id.to_s
100-
store = PlaceOS::Driver::RedisStorage.new(module_id)
101-
store.each do |key, value|
102-
status_updated += 1_u64
103-
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
104-
end
97+
paginate_modules do |mod|
98+
mods_mapped += 1_u64
99+
module_id = mod.id.to_s
100+
store = PlaceOS::Driver::RedisStorage.new(module_id)
101+
store.each do |key, value|
102+
status_updated += 1_u64
103+
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
104+
end
105105

106-
# Backpressure if event container is growing too fast
107-
if event_container.size >= MAX_CONTAINER_SIZE / 2
108-
until event_container.size < MAX_CONTAINER_SIZE / 4
109-
sleep 10.milliseconds
110-
end
106+
# Backpressure if event container is growing too fast
107+
if event_container.size > MAX_CONTAINER_SIZE // 2
108+
until event_container.size < MAX_CONTAINER_SIZE // 4
109+
sleep 10.milliseconds
111110
end
112-
rescue error
113-
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
114111
end
112+
rescue error
113+
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
115114
end
116115
Log.info { {
117116
message: "initial status sync complete",
@@ -131,26 +130,23 @@ module PlaceOS::Source
131130
status_updated = 0_u64
132131
pattern = "broker_resync"
133132

134-
paginate_modules do |modules|
135-
modules.each do |mod|
136-
next unless mod
137-
mods_mapped += 1_u64
138-
module_id = mod.id.to_s
139-
store = PlaceOS::Driver::RedisStorage.new(module_id)
140-
store.each do |key, value|
141-
status_updated += 1_u64
142-
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
143-
end
133+
paginate_modules do |mod|
134+
mods_mapped += 1_u64
135+
module_id = mod.id.to_s
136+
store = PlaceOS::Driver::RedisStorage.new(module_id)
137+
store.each do |key, value|
138+
status_updated += 1_u64
139+
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
140+
end
144141

145-
# Backpressure if event container is growing too fast
146-
if event_container.size >= MAX_CONTAINER_SIZE / 2
147-
until event_container.size < MAX_CONTAINER_SIZE / 4
148-
sleep 10.milliseconds
149-
end
142+
# Backpressure if event container is growing too fast
143+
if event_container.size >= MAX_CONTAINER_SIZE // 2
144+
until event_container.size < MAX_CONTAINER_SIZE // 4
145+
sleep 10.milliseconds
150146
end
151-
rescue error
152-
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
153147
end
148+
rescue error
149+
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
154150
end
155151

156152
Log.info { {
@@ -191,6 +187,18 @@ module PlaceOS::Source
191187
else
192188
process_batch(batch)
193189
Fiber.yield
190+
191+
# This outputs how many writes have occured for each publisher
192+
# stats = String.build do |io|
193+
# io << "\n\n\nNEXT BATCH:\n"
194+
# publisher_managers.each do |manager|
195+
# manager.stats.each do |name, count|
196+
# io << " * #{name} => #{count}"
197+
# end
198+
# end
199+
# io << "\n\n"
200+
# end
201+
# puts stats
194202
end
195203
rescue error
196204
Log.error(exception: error) { "error processing events" }

0 commit comments

Comments
 (0)