-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSteppedSimulator.lf
77 lines (70 loc) · 2.95 KB
/
SteppedSimulator.lf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* This program emulates a simulator that responds to step requests with its updated status. The
* simulator counts once per step request and responds with the current count. Queries arrive as
* MQTT messages on the `step_topic` topic, and the responses are published via MQTT on the
* `response_topic` topic. No timestamp is sent with the response.
*
* This program runs in fast mode and exits when it receives a "stop" message. It advances time only
* when it receives a step request.
*
* The message sent is a JSON string containing a count and the logical time at which it was
* updated.
*
* See ../README.md for prerequisites and further information.
*
* @author Edward A. Lee
*
* @param broker The MQTT broker address.
* @param step_topic The topic to subscribe to.
* @param response_topic The topic to publish to.
* @param period The period at which to step the simulator.
*/
target C {
keepalive: true,
fast: true
}
import MQTTSubscriber from <mqtt-c/MQTTSubscriber.lf>
import MQTTPublisher from <mqtt-c/MQTTPublisher.lf>
main reactor(
broker: string = "tcp://localhost:1883",
step_topic: string = "simulator-step",
response_topic: string = "simulator-response",
period: time = 1 sec) {
state count: int = 0
state last_logical_time: interval_t = 0
pub = new MQTTPublisher(address=broker, topic=response_topic, include_timestamp=false)
sub = new MQTTSubscriber(address=broker, topic=step_topic, use_physical_time=false)
logical action advance(period)
reaction(advance) {=
// Do nothing. This just advances logical time locally.
=}
reaction(sub.message) -> pub.in, advance {=
lf_print("SteppedSimulator: received: %s", sub.message->value);
if (strcmp(sub.message->value, "stop") == 0) {
lf_request_stop();
} else {
// Step the simulator.
self->count++;
self->last_logical_time = lf_time_logical_elapsed();
lf_print("SteppedSimulator: count=%d, logical_time=" PRINTF_TIME, self->count, self->last_logical_time);
// Construct a response.
// With NULL, 0 arguments, snprintf tells us how many bytes are needed.
// Add one for the null terminator.
interval_t start_time = lf_time_physical_elapsed();
interval_t now = lf_time_logical_elapsed();
char* format = "{\"count\":%d, \"logical_time_at_source\":" PRINTF_TIME ", \"physical_time_at_source\":" PRINTF_TIME "}";
size_t length = snprintf(NULL, 0, format, self->count, now, start_time) + 1;
// Dynamically allocate memory for the output.
char* buffer = (char*)malloc(length);
// Populate the output string and increment the count.
snprintf(buffer, length, format, self->count, now, start_time);
lf_set_array(pub.in, buffer, length);
tag_t tag = lf_tag();
lf_print("SteppedSimulator: At (elapsed) tag " PRINTF_TAG ", publish message:\n %s",
tag.time - lf_time_start(), tag.microstep,
pub.in->value
);
}
lf_schedule(advance, 0);
=}
}