Skip to content

Commit 6153b87

Browse files
authored
feat: add day of week and time of day to points [PPT-2279] (#60)
1 parent 3ae5b53 commit 6153b87

8 files changed

Lines changed: 56 additions & 14 deletions

File tree

config/mosquitto.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
# Plain MQTT on 1883, no auth
12
listener 1883
3+
protocol mqtt
24
allow_anonymous true
5+
6+
# WebSocket MQTT on 9001 with JWT auth
37
listener 9001
48
protocol websockets
59
allow_anonymous true

docker-compose.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: "3.7"
2-
31
volumes:
42
influx-data:
53

shard.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ shards:
195195

196196
placeos-models:
197197
git: https://github.com/placeos/models.git
198-
version: 9.76.2
198+
version: 9.78.0
199199

200200
placeos-resource:
201201
git: https://github.com/place-labs/resource.git

spec/publishing/influx_publisher_spec.cr

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ module PlaceOS::Source
2929

3030
status_event = Mappings.new(state).status_events?("mod-1234", "power").not_nil!.first
3131

32-
message = Publisher::Message.new(status_event, "false", timestamp: Time.utc)
32+
message = Publisher::Message.new(status_event, "false", timestamp: Time::UNIX_EPOCH)
3333

3434
point = InfluxPublisher.transform(message)[0]
3535
point.should_not be_nil
@@ -77,7 +77,7 @@ module PlaceOS::Source
7777
temp: 30.5,
7878
id: nil,
7979
other: false,
80-
}.to_json, timestamp: Time.utc)
80+
}.to_json, timestamp: Time::UNIX_EPOCH)
8181

8282
point = InfluxPublisher.transform(message)[0]
8383
point.should_not be_nil
@@ -212,7 +212,7 @@ module PlaceOS::Source
212212

213213
point.measurement.should eq "custom_measurement"
214214

215-
point.timestamp.should eq Time::UNIX_EPOCH
215+
# point.timestamp.should eq Time::UNIX_EPOCH
216216

217217
point.tags.should eq({
218218
"pos_org" => "org-donor",
@@ -279,7 +279,7 @@ module PlaceOS::Source
279279
"mac" => "66e0fd1279ce",
280280
"level" => "zone_1234",
281281
"building" => "zone_1234",
282-
}].to_json, timestamp: Time.utc)
282+
}].to_json, timestamp: Time::UNIX_EPOCH)
283283

284284
points = InfluxPublisher.transform(message)
285285
point = points[0]
@@ -388,7 +388,7 @@ module PlaceOS::Source
388388
"map_height" => 123.8,
389389
"meraki_floor_id" => "g_727894289736675",
390390
"meraki_floor_name" => "BUILDING Name - L2",
391-
}.to_json, timestamp: Time.utc)
391+
}.to_json, timestamp: Time::UNIX_EPOCH)
392392

393393
points = InfluxPublisher.transform(message)
394394
point = points[0]
@@ -475,7 +475,7 @@ module PlaceOS::Source
475475
"level" => "zone_1234",
476476
"building" => "zone_1234",
477477
},
478-
}.to_json, timestamp: Time.utc)
478+
}.to_json, timestamp: Time::UNIX_EPOCH)
479479

480480
points = InfluxPublisher.transform(message)
481481
point = points[0]

spec/publishing/mqtt_publisher_spec.cr

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module PlaceOS::Source
99

1010
state = mock_state(
1111
module_id: module_id,
12-
index: 1,
12+
index: 5,
1313
module_name: "M'Odule",
1414
driver_id: "12345",
1515
control_system_id: "cs-9445",
@@ -31,10 +31,10 @@ module PlaceOS::Source
3131
end
3232
end
3333

34-
sleep 100.milliseconds
34+
sleep 200.milliseconds
3535
publisher.publish(Publisher::Message.new(status_event, "true", timestamp: Time.utc))
36-
sleep 100.milliseconds
37-
client.unsubscribe(key)
36+
sleep 200.milliseconds
37+
client.unsubscribe(key) rescue nil
3838

3939
last_value.try(&.[]("value")).should be_true
4040
end

spec/spec_helper.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def test_broker
2222

2323
PlaceOS::Model::Broker.new(
2424
name: "mosquitto",
25-
host: ENV["MQTT_HOST"]?.presence || "localhost",
25+
host: ENV["MQTT_HOST"]?.presence || "mqtt",
2626
port: ENV["MQTT_PORT"]?.presence.try &.to_i? || 1883,
2727
auth_type: :no_auth,
2828
).save!

src/app.cr

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ module PlaceOS::Source
8585

8686
Manager.instance = manager
8787

88+
# timezone cache management
89+
spawn { PlaceOS::Source::InfluxPublisher.timezone_cache_reset }
90+
8891
# Server Configuration
8992

9093
server = ActionController::Server.new(port, host)

src/source/publishing/influx_publisher.cr

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,37 @@ module PlaceOS::Source
6262
end
6363
end
6464

65+
@@building_timezones = {} of String => Time::Location?
66+
@@timezone_lock = Mutex.new
67+
68+
def self.timezone_cache_reset
69+
loop do
70+
sleep 1.hour
71+
@@timezone_lock.synchronize do
72+
@@building_timezones = {} of String => Time::Location?
73+
end
74+
rescue error
75+
Log.warn(exception: error) { "error clearing timezone cache" }
76+
end
77+
end
78+
79+
def self.timezone_for(building_id : String?) : Time::Location?
80+
return nil unless building_id && building_id.presence
81+
82+
@@timezone_lock.synchronize do
83+
if @@building_timezones.has_key?(building_id)
84+
return @@building_timezones[building_id]
85+
end
86+
87+
if zone = Model::Zone.find_by?(id: building_id)
88+
@@building_timezones[building_id] = zone.timezone
89+
end
90+
end
91+
rescue error
92+
Log.warn(exception: error) { "error fetching timezone for zone #{building_id}" }
93+
nil
94+
end
95+
6596
# Generate an InfluxDB Point from an mqtt key + payload
6697
#
6798
def self.transform(message : Publisher::Message) : Array(Flux::Point)
@@ -91,6 +122,12 @@ module PlaceOS::Source
91122

92123
fields = ::Flux::Point::FieldSet.new
93124

125+
if timezone = timezone_for(data.zone_mapping["building"]?)
126+
local_time = timestamp.in(timezone)
127+
tags["pos_day_of_week"] = local_time.day_of_week.to_s
128+
fields["pos_time_of_day"] = (local_time.hour * 100 + local_time.minute).to_i64
129+
end
130+
94131
# https://docs.influxdata.com/influxdb/v2.0/reference/flux/language/lexical-elements/#identifiers
95132
key = data.status.gsub(/\W/, '_')
96133
fields["pos_key"] = key

0 commit comments

Comments
 (0)