Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG CRYSTAL_VERSION=latest

FROM placeos/crystal:$CRYSTAL_VERSION as build
FROM placeos/crystal:$CRYSTAL_VERSION AS build
WORKDIR /app

# Set the commit via a build arg
Expand Down
4 changes: 4 additions & 0 deletions spec/publishing/publish_metadata_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ module PlaceOS::Source
def broadcast(message : Publisher::Message)
messages << message
end

def stats : Hash(String, UInt64)
{"mock" => 0_u64}
end
end

class Dummy
Expand Down
4 changes: 4 additions & 0 deletions src/source/publishing/influx_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ module PlaceOS::Source

delegate start, stop, to: publisher

def stats : Hash(String, UInt64)
{"influx" => publisher.processed}
end

def initialize(
@influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"),
@influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"),
Expand Down
12 changes: 11 additions & 1 deletion src/source/publishing/mqtt_broker_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ module PlaceOS::Source
end
end

def stats : Hash(String, UInt64)
hash = {} of String => UInt64
read_publishers do |publishers|
publishers.values.each do |publisher|
hash["MQTT: #{publisher.broker.name}"] = publisher.processed
end
end
hash
end

def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result
# Don't recreate the publisher if only "safe" attributes have changed
case action
Expand Down Expand Up @@ -131,7 +141,7 @@ module PlaceOS::Source
end
end

# Safe to update iff fields in SAFE_ATTRIBUTES changed
# Safe to update if fields in SAFE_ATTRIBUTES changed
#
def self.safe_update?(model : Model::Broker)
# Take the union of the changed fields and the safe fields
Expand Down
2 changes: 1 addition & 1 deletion src/source/publishing/mqtt_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module PlaceOS::Source
Event.new(value, timestamp).to_json
end

private getter broker : PlaceOS::Model::Broker
getter broker : PlaceOS::Model::Broker
private getter broker_lock : RWLock = RWLock.new
protected getter client : ::MQTT::V3::Client

Expand Down
18 changes: 9 additions & 9 deletions src/source/publishing/publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ module PlaceOS::Source
timestamp : Time
)

getter message_queue : Channel(Message) = Channel(Message).new
getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE)
getter processed : UInt64 = 0_u64

abstract def publish(message : Message)

def start
consume_messages
spawn { consume_messages }
end

def stop
message_queue.close
end

private def consume_messages
spawn do
while message = message_queue.receive?
begin
publish(message)
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
end
while message = message_queue.receive?
begin
publish(message)
@processed += 1_u64
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/source/publishing/publisher_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ module PlaceOS::Source
abstract def broadcast(message : Publisher::Message)
abstract def start
abstract def stop

abstract def stats : Hash(String, UInt64)
end
end
102 changes: 66 additions & 36 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ module PlaceOS::Source
Log = ::Log.for(self)

STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
MAX_CONTAINER_SIZE = 50_000
MAX_CONTAINER_SIZE = 40_000
BATCH_SIZE = 100
PROCESSING_INTERVAL = 100.milliseconds
PROCESSING_INTERVAL = 40.milliseconds
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8

private getter! redis : Redis
Expand Down Expand Up @@ -66,30 +66,51 @@ module PlaceOS::Source
redis.close
end

def paginate_modules(&)
batch_size = 64
last_created_at = Time.unix(0)
last_id = ""

loop do
modules = PlaceOS::Model::Module
.where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id)
.order(created_at: :asc, id: :asc)
.limit(batch_size)
.to_a

# process
break if modules.empty?
modules.each do |mod|
yield mod
end
break if modules.size < batch_size

last_created_at = modules.last.created_at
last_id = modules.last.id
end
end

def update_values
mods_mapped = 0_u64
status_updated = 0_u64
pattern = "initial_sync"
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size > MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
Log.info { {
message: "initial status sync complete",
Expand All @@ -109,26 +130,23 @@ module PlaceOS::Source
status_updated = 0_u64
pattern = "broker_resync"

PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end

Log.info { {
Expand Down Expand Up @@ -169,6 +187,18 @@ module PlaceOS::Source
else
process_batch(batch)
Fiber.yield

# This outputs how many writes have occured for each publisher
# stats = String.build do |io|
# io << "\n\n\nNEXT BATCH:\n"
# publisher_managers.each do |manager|
# manager.stats.each do |name, count|
# io << " * #{name} => #{count}"
# end
# end
# io << "\n\n"
# end
# puts stats
end
rescue error
Log.error(exception: error) { "error processing events" }
Expand Down
Loading