Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,37 @@ See [`CONTRIBUTING.md`](./CONTRIBUTING.md).

## Environment Variables

* `SG_ENV` = set to `production` for production log levels
* `SMTP_SERVER` = hostname of the SMTP server for sending emails. i.e. smtp.example.com
* `SMTP_PORT` = port to connect to the SMTP server
* `SMTP_USER` = username if any required, will not authenticate if this is left blank
* `SMTP_PASS` = password if required
* `SMTP_SECURE` = left blank for clear text, `SMTPS` for a TLS connection, `STARTTLS` for negotiating TLS on an initial clear text connection
* `REDIS_URL` = the redis connection string, defaults to `redis://localhost:6379`
### Core Configuration

- `SG_ENV` = set to `production` for production log levels (default: `development`)
- `SG_SERVER_PORT` = server port (default: `3000`)
- `SG_SERVER_HOST` = server host (default: `127.0.0.1`)
- `SG_PROCESS_COUNT` = number of processes (default: `1`)
- `REDIS_URL` = the redis connection string, defaults to `redis://localhost:6379`

### SMTP Configuration

- `SMTP_SERVER` = hostname of the SMTP server for sending emails (default: `smtp.example.com`)
- `SMTP_PORT` = port to connect to the SMTP server (default: `25`)
- `SMTP_USER` = username if any required, will not authenticate if this is left blank
- `SMTP_PASS` = password if required
- `SMTP_SECURE` = left blank for clear text, `SMTPS` for a TLS connection, `STARTTLS` for negotiating TLS on an initial clear text connection

### Telemetry (PlaceOS Pulse)

- `PLACE_PULSE_ENABLED` = enable telemetry (set to `1` or `true`)
- `PLACE_DOMAIN` = domain for telemetry instance
- `PLACE_PULSE_INSTANCE_EMAIL` = email for telemetry instance

### Trigger Intervals

- `UPDATE_CHECK_INTERVAL` = driver update check interval (default: `2h`)
- `GRAPH_SECRET_CHECK_INTERVAL` = graph secret expiry check interval (default: `24h`)
- `LOKI_SEARCH_CHECK_INTERVAL` = Loki error search interval (default: `1h`)
- `LOKI_SEARCH_WINDOW` = Loki search time window (default: `24h`)

**Duration Format**: Intervals support flexible formats like `5m`, `1h20m`, `2h30m45s`, etc.
**Note**: Invalid formats (like `"5"` without postfix, `"invalid"`, etc.) will be treated as zero duration, which may cause unexpected behavior. Always include proper time units (`h`, `m`, `s`).

## Bindings

Expand Down
241 changes: 241 additions & 0 deletions spec/loki_search_errors_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
require "./helper"

module PlaceOS::Triggers
describe LokiSearchForErrors do
it "performs bulk update using UNNEST with array parameters" do
# Create test modules
mod1 = PlaceOS::Model::Generator.module.save!
mod1.running = true
mod1.save!

mod2 = PlaceOS::Model::Generator.module.save!
mod2.running = true
mod2.save!

mod3 = PlaceOS::Model::Generator.module.save!
mod3.running = true
mod3.save!

# Prepare test data
mod_ids = [mod1.id.as(String), mod2.id.as(String), mod3.id.as(String)]
timestamps = [
Time.utc - 1.hour,
Time.utc - 30.minutes,
Time.utc - 15.minutes,
]

# Execute the bulk update SQL directly (same as in the actual code)
sql = <<-SQL
UPDATE #{PlaceOS::Model::Module.table_name}
SET has_runtime_error = true,
error_timestamp = data.timestamp
FROM (
SELECT UNNEST($1::text[]) AS id,
UNNEST($2::timestamptz[]) AS timestamp
) AS data
WHERE #{PlaceOS::Model::Module.table_name}.id = data.id
SQL

result = PgORM::Database.connection do |dbc|
dbc.exec(sql, mod_ids, timestamps)
end

# Verify the update was successful
result.rows_affected.should eq(3)

# Verify the modules were updated correctly
mod1.reload!
mod2.reload!
mod3.reload!

mod1.has_runtime_error.should be_true
mod2.has_runtime_error.should be_true
mod3.has_runtime_error.should be_true

# PostgreSQL truncates to microsecond precision, so compare with tolerance
mod1.error_timestamp.not_nil!.to_unix.should eq(timestamps[0].to_unix)
mod2.error_timestamp.not_nil!.to_unix.should eq(timestamps[1].to_unix)
mod3.error_timestamp.not_nil!.to_unix.should eq(timestamps[2].to_unix)
end

it "only updates running modules" do
# Create test modules - some running, some not
running_mod = PlaceOS::Model::Generator.module.save!
running_mod.running = true
running_mod.save!

stopped_mod = PlaceOS::Model::Generator.module.save!
stopped_mod.running = false
stopped_mod.save!

# Try to update both
mod_ids = [running_mod.id.as(String), stopped_mod.id.as(String)]
timestamps = [Time.utc - 1.hour, Time.utc - 30.minutes]

# Get only running module IDs (simulating the actual code logic)
running_module_ids = PlaceOS::Model::Module.where(running: true).pluck(:id).to_set

# Filter updates to only running modules
updates = Hash.zip(mod_ids, timestamps).select { |mod_id, _| running_module_ids.includes?(mod_id) }

# Execute bulk update only for running modules
sql = <<-SQL
UPDATE #{PlaceOS::Model::Module.table_name}
SET has_runtime_error = true,
error_timestamp = data.timestamp
FROM (
SELECT UNNEST($1::text[]) AS id,
UNNEST($2::timestamptz[]) AS timestamp
) AS data
WHERE #{PlaceOS::Model::Module.table_name}.id = data.id
SQL

result = PgORM::Database.connection do |dbc|
dbc.exec(sql, updates.keys, updates.values)
end

# Should only update 1 module (the running one)
result.rows_affected.should eq(1)

# Verify only the running module was updated
running_mod.reload!
stopped_mod.reload!

running_mod.has_runtime_error.should be_true
stopped_mod.has_runtime_error.should be_false
end

it "handles empty updates gracefully" do
# Execute with empty arrays
sql = <<-SQL
UPDATE #{PlaceOS::Model::Module.table_name}
SET has_runtime_error = true,
error_timestamp = data.timestamp
FROM (
SELECT UNNEST($1::text[]) AS id,
UNNEST($2::timestamptz[]) AS timestamp
) AS data
WHERE #{PlaceOS::Model::Module.table_name}.id = data.id
SQL

result = PgORM::Database.connection do |db|

Check notice on line 121 in spec/loki_search_errors_spec.cr

View workflow job for this annotation

GitHub Actions / Ameba

Naming/BlockParameterName

Disallowed block parameter name found
Raw output
> result = PgORM::Database.connection do |db|
                                          ^
db.exec(sql, [] of String, [] of Time)
end

# Should affect 0 rows
result.rows_affected.should eq(0)
end

it "validates pg-orm DSL query for running modules" do
# Create a mix of running and stopped modules
running_mod1 = PlaceOS::Model::Generator.module.save!
running_mod1.running = true
running_mod1.save!

running_mod2 = PlaceOS::Model::Generator.module.save!
running_mod2.running = true
running_mod2.save!

stopped_mod1 = PlaceOS::Model::Generator.module.save!
stopped_mod1.running = false
stopped_mod1.save!

stopped_mod2 = PlaceOS::Model::Generator.module.save!
stopped_mod2.running = false
stopped_mod2.save!

# Test the exact DSL query used in the implementation
running_module_ids = PlaceOS::Model::Module.where(running: true).pluck(:id).to_set

# Verify it returns only running module IDs
running_module_ids.should contain(running_mod1.id)
running_module_ids.should contain(running_mod2.id)
running_module_ids.should_not contain(stopped_mod1.id)
running_module_ids.should_not contain(stopped_mod2.id)

# Verify the count is correct
running_module_ids.size.should be >= 2 # At least our 2 running modules

# Verify the DSL query works correctly for filtering (the key functionality)
test_module_ids = [running_mod1.id.as(String), stopped_mod1.id.as(String)]
filtered = test_module_ids.select { |id| running_module_ids.includes?(id) }
filtered.should eq([running_mod1.id])

# Verify it supports O(1) lookups (Set behavior)
running_module_ids.includes?(running_mod1.id).should be_true
running_module_ids.includes?(stopped_mod1.id).should be_false
end

it "handles large batch updates efficiently" do
# Create 100 test modules
modules = Array.new(100) do
mod = PlaceOS::Model::Generator.module.save!
mod.running = true
mod.save!
mod
end

mod_ids = modules.map(&.id.as(String))
timestamps = Array.new(100) { |i| Time.utc - i.minutes }

# Execute bulk update
sql = <<-SQL
UPDATE #{PlaceOS::Model::Module.table_name}
SET has_runtime_error = true,
error_timestamp = data.timestamp
FROM (
SELECT UNNEST($1::text[]) AS id,
UNNEST($2::timestamptz[]) AS timestamp
) AS data
WHERE #{PlaceOS::Model::Module.table_name}.id = data.id
SQL

result = PgORM::Database.connection do |db|

Check notice on line 193 in spec/loki_search_errors_spec.cr

View workflow job for this annotation

GitHub Actions / Ameba

Naming/BlockParameterName

Disallowed block parameter name found
Raw output
> result = PgORM::Database.connection do |db|
                                          ^
db.exec(sql, mod_ids, timestamps)
end

# Should update all 100 modules in a single query
result.rows_affected.should eq(100)

# Spot check a few modules
modules[0].reload!
modules[50].reload!
modules[99].reload!

modules[0].has_runtime_error.should be_true
modules[50].has_runtime_error.should be_true
modules[99].has_runtime_error.should be_true
end

it "uses configurable search window with flexible duration format" do
# Test that it uses the LOKI_SEARCH_WINDOW constant and parses it correctly
searcher = LokiSearchForErrors.new(1.minute)
expected_window = PlaceOS::Triggers.extract_time_span(PlaceOS::Triggers::LOKI_SEARCH_WINDOW)
searcher.@search_window.should eq(expected_window)

# Default should be 24 hours
searcher.@search_window.should eq(24.hours)
end

it "validates duration parsing supports various formats" do
# Test the extract_time_span method with different formats
PlaceOS::Triggers.extract_time_span("5m").should eq(5.minutes)
PlaceOS::Triggers.extract_time_span("1h20m").should eq(1.hour + 20.minutes)
PlaceOS::Triggers.extract_time_span("5h").should eq(5.hours)
PlaceOS::Triggers.extract_time_span("30s").should eq(30.seconds)
PlaceOS::Triggers.extract_time_span("2h30m45s").should eq(2.hours + 30.minutes + 45.seconds)
end

it "handles edge cases in duration parsing" do
# Test what happens with just a number (no postfix)
PlaceOS::Triggers.extract_time_span("5").should eq(Time::Span.zero)

# Test empty string
PlaceOS::Triggers.extract_time_span("").should eq(Time::Span.zero)

# Test invalid formats - also return zero duration (regex matches but captures nothing)
PlaceOS::Triggers.extract_time_span("invalid").should eq(Time::Span.zero)
PlaceOS::Triggers.extract_time_span("5x").should eq(Time::Span.zero)
end
end
end
1 change: 1 addition & 0 deletions src/constants.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module PlaceOS::Triggers
DRIVER_UPDATE_CHECK_INTERVAL = ENV["UPDATE_CHECK_INTERVAL"]? || "2h"
GRAPH_SECRET_CHECK_INTERVAL = ENV["GRAPH_SECRET_CHECK_INTERVAL"]? || "24h"
LOKI_SEARCH_CHECK_INTERVAL = ENV["LOKI_SEARCH_CHECK_INTERVAL"]? || "1h"
LOKI_SEARCH_WINDOW = ENV["LOKI_SEARCH_WINDOW"]? || "24h"

class_getter? production : Bool = ENVIRONMENT.downcase == "production"
class_getter? pulse_enabled : Bool = PULSE_ENABLED
Expand Down
83 changes: 63 additions & 20 deletions src/placeos-triggers/loki_search_errrors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

def initialize(@repeat_interval : Time::Span)
@client = Loki::Client.from_env
@search_window = Triggers.extract_time_span(LOKI_SEARCH_WINDOW)
end

def self.new(interval : String)
Expand All @@ -22,30 +23,72 @@

Tasker.every(@repeat_interval) do
Log.debug { "Searching Loki for runtime error logs of all running modules" }
PlaceOS::Model::Module.where(running: true).each do |mod|
Log.debug { {message: "Searching Loki for module errors", module: mod.id, driver: mod.driver_id} }
begin
errors = Array(Tuple(String, String)).new
result = @client.query_range(query, 1000, Time.utc - 24.hour, Time.utc, Loki::Direction::Backward)
result.response_data.result.as(Loki::Model::Streams).each do |resp_stream|
map = resp_stream.labels.map
errors << {map["source"], map["time"]}
end
if errors.empty?
Log.info { "No module runtime errors found. Skipping..." }
next
end

errors = errors.uniq { |v| v[0] }
errors.each do |mod_id, time|
err_time = Time::Format::RFC_3339.parse(time)
PlaceOS::Model::Module.update(mod_id, {has_runtime_error: true, error_timestamp: err_time})
begin
# Query Loki once for all modules
result = @client.query_range(query, 1000, Time.utc - @search_window, Time.utc, Loki::Direction::Backward)

# Build a map of module_id => latest error timestamp
module_errors = Hash(String, Time).new
result.response_data.result.as(Loki::Model::Streams).each do |resp_stream|
map = resp_stream.labels.map

# Nil-safe extraction
mod_id = map["source"]?
time_str = map["time"]?

next unless mod_id && time_str

begin
err_time = Time::Format::RFC_3339.parse(time_str)
# Keep only the latest error per module
if !module_errors.has_key?(mod_id) || err_time > module_errors[mod_id]
module_errors[mod_id] = err_time
end
rescue ex
Log.error(exception: ex) { {message: "Exception received when updating module", module: mod_id, timestamp: time} }
Log.error(exception: ex) { {message: "Failed to parse timestamp", module: mod_id, timestamp: time_str} }
end
rescue ex
Log.error(exception: ex) { "Exception received" }
end

if module_errors.empty?
Log.info { "No module runtime errors found. Skipping..." }
next
end

Log.debug { {message: "Found errors for modules", count: module_errors.size} }

# Get all running module IDs for validation
running_module_ids = PlaceOS::Model::Module.where(running: true).pluck(:id).to_set

# Filter to only running modules
updates = module_errors.select { |mod_id, _| running_module_ids.includes?(mod_id) }

if updates.empty?
Log.info { "No running modules with errors to update. Skipping..." }
next
end

# Bulk update using UNNEST - single database query
mod_ids = updates.keys
timestamps = updates.values

sql = <<-SQL
UPDATE #{PlaceOS::Model::Module.table_name}
SET has_runtime_error = true,
error_timestamp = data.timestamp
FROM (
SELECT UNNEST($1::text[]) AS id,
UNNEST($2::timestamptz[]) AS timestamp
) AS data
WHERE #{PlaceOS::Model::Module.table_name}.id = data.id
SQL

PgORM::Database.connection do |db|

Check notice on line 86 in src/placeos-triggers/loki_search_errrors.cr

View workflow job for this annotation

GitHub Actions / Ameba

Naming/BlockParameterName

Disallowed block parameter name found
Raw output
> PgORM::Database.connection do |db|
                                 ^
db.exec(sql, mod_ids, timestamps)
end
Log.info { {message: "Bulk updated modules with runtime errors", count: updates.size} }
rescue ex
Log.error(exception: ex) { "Exception received while searching Loki" }
end
end
end
Expand Down
Loading