Skip to content

Commit a79fa1b

Browse files
committed
Implemented leaky bucket algorithm
1 parent 4d7cb74 commit a79fa1b

File tree

3 files changed

+96
-47
lines changed

3 files changed

+96
-47
lines changed

src/LeakyBucket.h

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2019 Sinric. All rights reserved.
3+
* Licensed under Creative Commons Attribution-Share Alike (CC BY-SA)
4+
*
5+
* This file is part of the Sinric Pro (https://github.com/sinricpro/)
6+
*/
7+
8+
9+
#ifndef _LEAKY_BUCKET_H_
10+
#define _LEAKY_BUCKET_H_
11+
12+
class LeakyBucket_t {
13+
public:
14+
LeakyBucket_t() : dropsInBucket(0), lastDrop(-DROP_IN_TIME), once(false) {}
15+
bool addDrop();
16+
private:
17+
void leak();
18+
int dropsInBucket;
19+
unsigned long lastDrop;
20+
bool once;
21+
unsigned long lastWarning;
22+
};
23+
24+
bool LeakyBucket_t::addDrop() {
25+
leak();
26+
unsigned long actualMillis = millis();
27+
28+
if (dropsInBucket < BUCKET_SIZE && actualMillis-lastDrop > dropsInBucket + DROP_IN_TIME) { // new drop can be placed into bucket?
29+
dropsInBucket++; // place drop in bucket
30+
lastDrop = actualMillis; // store last drop time
31+
if (dropsInBucket == BUCKET_SIZE && once==false) {
32+
Serial.printf("[SinricPro]: WARNING: YOU SENT TOO MUCH EVENTS IN A SHORT PERIOD OF TIME!\r\n - PLEASE CHECK YOUR CODE AND SEND EVENTS ONLY IF DEVICE STATE HAS CHANGED!\r\n"); // Print a warning when bucket is full
33+
once = true;
34+
}
35+
return true;
36+
}
37+
38+
if (dropsInBucket >= BUCKET_SIZE) {
39+
if (actualMillis-lastWarning > 1000) {
40+
Serial.printf("[SinricPro]: EVENTS ARE BLOCKED FOR %lu SECONDS!\r\n",(DROP_OUT_TIME-(actualMillis-lastDrop))/1000);
41+
lastWarning = actualMillis;
42+
}
43+
}
44+
return false;
45+
}
46+
47+
void LeakyBucket_t::leak() {
48+
// leack bucket...
49+
unsigned long actualMillis = millis();
50+
int drops_to_leak = (actualMillis - lastDrop) / DROP_OUT_TIME;
51+
if (drops_to_leak > 0) {
52+
if (dropsInBucket <= drops_to_leak) {
53+
dropsInBucket = 0;
54+
} else {
55+
dropsInBucket -= drops_to_leak;
56+
}
57+
}
58+
}
59+
60+
61+
#endif

src/SinricProConfig.h

+20-3
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,37 @@
77

88
#ifndef __SINRICPRO_CONFIG_H__
99
#define __SINRICPRO_CONFIG_H__
10+
/*
11+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
12+
* !! !!
13+
* !! WARNING: DON'T TOUCH ! !!
14+
* !! ====================== !!
15+
* !! PLEASE DO NOT MODIFY ANY OF THESE SETTINGS HERE !!
16+
* !! THIS IS FOR INTERNAL CONFIGURATION ONLY !!
17+
* !! SINRIC PRO MIGHT NOT WORK IF YOU MODIFY THIS !!
18+
* !! !!
19+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
20+
*/
1021

22+
// Version Configuration
1123
#define SDK_VERSION "2.2.3"
1224

25+
// Server Configuration
1326
#define SINRICPRO_SERVER_URL "ws.sinric.pro"
1427
#define SINRICPRO_SERVER_PORT 80
1528

29+
// UDP Configuration
1630
#define UDP_MULTICAST_IP IPAddress(224,9,9,9)
1731
#define UDP_MULTICAST_PORT 3333
1832

19-
// websocket sends every WEBSOCKET_PING_INTERVAL milliseconds a ping to Server
20-
// if there is no pong received after WEBSOCKET_PING_TIMEOUT milliseconds, retry count is incremented by one
21-
// if retry count reaches WEBSOCKET_RETRY_COUNT websocket connection is closed and try to reconnect server
33+
// WebSocket Configuration
2234
#define WEBSOCKET_PING_INTERVAL 300000
2335
#define WEBSOCKET_PING_TIMEOUT 10000
2436
#define WEBSOCKET_RETRY_COUNT 2
2537

38+
// LeakyBucket Configuration
39+
#define BUCKET_SIZE 10
40+
#define DROP_OUT_TIME 60000
41+
#define DROP_IN_TIME 1000u
42+
2643
#endif

src/SinricProDevice.h

+15-44
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,9 @@
99
#define _SINRICDEVICE_H_
1010

1111
#include "SinricProDeviceInterface.h"
12-
#include <map>
13-
14-
#define BUCKET_SIZE 20
15-
#define DROP_WAIT_TIME 30000
16-
#define DROP_ADD_FACTOR 100
12+
#include "LeakyBucket.h"
1713

18-
struct leackyBucket_t {
19-
int dropsInBucket=0;
20-
unsigned long lastDrop;
21-
};
14+
#include <map>
2215

2316
class SinricProDevice : public SinricProDeviceInterface {
2417
public:
@@ -46,7 +39,7 @@ class SinricProDevice : public SinricProDeviceInterface {
4639
private:
4740
SinricProInterface* eventSender;
4841
unsigned long eventWaitTime;
49-
std::map<String, leackyBucket_t> eventFilter;
42+
std::map<String, LeakyBucket_t> eventFilter;
5043
};
5144

5245
SinricProDevice::SinricProDevice(const char* newDeviceId, unsigned long eventWaitTime) :
@@ -90,52 +83,30 @@ DynamicJsonDocument SinricProDevice::prepareEvent(const char* deviceId, const ch
9083
return DynamicJsonDocument(1024);
9184
}
9285

86+
9387
bool SinricProDevice::sendEvent(JsonDocument& event) {
94-
unsigned long actualMillis = millis();
9588
String eventName = event["payload"]["action"] | ""; // get event name
9689

97-
leackyBucket_t bucket;
90+
LeakyBucket_t bucket; // leaky bucket algorithm is used to prevent flooding the server
9891

99-
// get Bucket for eventName
100-
101-
if (eventFilter.find(eventName) == eventFilter.end()) { // if eventFilter is not initialized
102-
// eventFilter[eventName] = -eventWaitTime; // initialize eventFilter
103-
bucket.dropsInBucket = 0;
104-
bucket.lastDrop = -eventWaitTime;
105-
eventFilter[eventName] = bucket;
106-
} else { // if eventFilter is initialized, get bucket
107-
bucket = eventFilter[eventName];
108-
}
109-
DEBUG_SINRIC("Bucket dropsInBucket: %d\r\n", bucket.dropsInBucket);
110-
// leack bucket...
111-
int drops_to_leak = (actualMillis - bucket.lastDrop) / DROP_WAIT_TIME;
112-
DEBUG_SINRIC("Bucket leaking: %d\r\n", drops_to_leak);
113-
if (drops_to_leak > 0) {
114-
if (bucket.dropsInBucket <= drops_to_leak) {
115-
bucket.dropsInBucket = 0;
116-
} else {
117-
bucket.dropsInBucket -= drops_to_leak;
118-
}
92+
// get leaky bucket for event from eventFilter
93+
if (eventFilter.find(eventName) == eventFilter.end()) { // if there is no bucket ...
94+
eventFilter[eventName] = bucket; // ...add a new bucket
95+
} else {
96+
bucket = eventFilter[eventName]; // else get bucket
11997
}
12098

121-
// unsigned long lastEventMillis = eventFilter[eventName] | 0; // get the last timestamp for event
122-
// if (actualMillis - lastEventMillis < eventWaitTime) return false; // if last event was before waitTime return...
123-
// if (actualMillis - bucket.lastDrop < eventWaitTime) return false;
124-
if (bucket.dropsInBucket < BUCKET_SIZE && actualMillis-bucket.lastDrop > bucket.dropsInBucket * DROP_ADD_FACTOR) { // new drop can be placed into bucket?
125-
Serial.printf("SinricProDevice::sendMessage(): %d event's left before limiting to 1 event per %d seconds. %lu ms until next event\r\n", BUCKET_SIZE-bucket.dropsInBucket-1, DROP_WAIT_TIME / 1000, ((bucket.dropsInBucket+1) * DROP_ADD_FACTOR));
126-
bucket.dropsInBucket++; // place drop in bucket
127-
bucket.lastDrop = actualMillis; // store last drop time
128-
eventFilter[eventName] = bucket; // save bucket back to map
129-
if (eventSender) eventSender->sendMessage(event); // send event
99+
if (bucket.addDrop()) { // if we can add a new drop
100+
if (eventSender) eventSender->sendMessage(event); // send event
101+
eventFilter[eventName] = bucket; // update bucket on eventFilter
130102
return true;
131103
}
132104

133-
if (bucket.dropsInBucket >= BUCKET_SIZE) {
134-
Serial.printf("- WARNING: EVENTS ARE BLOCKED FOR %lu SECONDS (%lu seconds left)\r", DROP_WAIT_TIME/1000, (DROP_WAIT_TIME-(actualMillis-bucket.lastDrop))/1000);
135-
}
105+
eventFilter[eventName] = bucket; // update bucket on eventFilter
136106
return false;
137107
}
138108

109+
139110
bool SinricProDevice::sendPowerStateEvent(bool state, String cause) {
140111
DynamicJsonDocument eventMessage = prepareEvent(deviceId, "setPowerState", cause.c_str());
141112
JsonObject event_value = eventMessage["payload"]["value"];

0 commit comments

Comments
 (0)