Skip to content

Commit fe52807

Browse files
authored
Merge pull request #48 from sysflow-telemetry/ratelimiting
feat(core): add rate limiting filter with time decaying
2 parents e8d6a37 + dbc77ed commit fe52807

File tree

6 files changed

+267
-29
lines changed

6 files changed

+267
-29
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1717

1818
## [0.4.4] - 2022-08-01
1919

20+
### Added
21+
22+
- Add rate limiting filter with time decaying
23+
2024
### Changed
2125

2226
- Bump UBI to 8.6-855

core/flattener/config.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
//
2+
// Copyright (C) 2022 IBM Corporation.
3+
//
4+
// Authors:
5+
// Frederico Araujo <[email protected]>
6+
// Teryl Taylor <[email protected]>
7+
//
8+
// Licensed under the Apache License, Version 2.0 (the "License");
9+
// you may not use this file except in compliance with the License.
10+
// You may obtain a copy of the License at
11+
//
12+
// http://www.apache.org/licenses/LICENSE-2.0
13+
//
14+
// Unless required by applicable law or agreed to in writing, software
15+
// distributed under the License is distributed on an "AS IS" BASIS,
16+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
// See the License for the specific language governing permissions and
18+
// limitations under the License.
19+
20+
// Package flattener flattens input telemetry in a flattened representation.
21+
package flattener
22+
23+
import (
24+
"strconv"
25+
"time"
26+
)
27+
28+
// Configuration keys.
29+
const (
30+
FilterOnOffKey string = "filter.enabled"
31+
FilterMaxAgeKey string = "filter.maxage"
32+
)
33+
34+
// Config defines a configuration object for the engine.
35+
type Config struct {
36+
FilterOnOff OnOff
37+
FilterMaxAge time.Duration
38+
}
39+
40+
// CreateConfig creates a new config object from config dictionary.
41+
func CreateConfig(conf map[string]interface{}) (Config, error) {
42+
var c Config = Config{FilterOnOff: Off, FilterMaxAge: 24 * time.Hour} // default values
43+
var err error
44+
if v, ok := conf[FilterOnOffKey].(string); ok {
45+
c.FilterOnOff = parseOnOffType(v)
46+
}
47+
if v, ok := conf[FilterMaxAgeKey].(string); ok {
48+
var duration int
49+
duration, err = strconv.Atoi(v)
50+
if err == nil {
51+
c.FilterMaxAge = time.Duration(duration) * time.Second
52+
}
53+
}
54+
return c, err
55+
}
56+
57+
// OnOff defines an On-Off state type.
58+
type OnOff int32
59+
60+
// OnOff types.
61+
const (
62+
Off OnOff = iota
63+
On
64+
)
65+
66+
func (s OnOff) String() string {
67+
return [...]string{"off", "on"}[s]
68+
}
69+
70+
func (s OnOff) Enabled() bool {
71+
return s == On
72+
}
73+
74+
func parseOnOffType(s string) OnOff {
75+
if Off.String() == s {
76+
return Off
77+
}
78+
if On.String() == s {
79+
return On
80+
}
81+
return Off
82+
}

core/flattener/filter.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//
2+
// Copyright (C) 2022 IBM Corporation.
3+
//
4+
// Authors:
5+
// Frederico Araujo <[email protected]>
6+
// Teryl Taylor <[email protected]>
7+
//
8+
// Licensed under the Apache License, Version 2.0 (the "License");
9+
// you may not use this file except in compliance with the License.
10+
// You may obtain a copy of the License at
11+
//
12+
// http://www.apache.org/licenses/LICENSE-2.0
13+
//
14+
// Unless required by applicable law or agreed to in writing, software
15+
// distributed under the License is distributed on an "AS IS" BASIS,
16+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
// See the License for the specific language governing permissions and
18+
// limitations under the License.
19+
20+
// Package flattener flattens input telemetry in a flattened representation.
21+
package flattener
22+
23+
import (
24+
"container/list"
25+
"encoding/binary"
26+
"time"
27+
28+
"github.com/cespare/xxhash/v2"
29+
"github.com/sysflow-telemetry/sf-apis/go/sfgo"
30+
)
31+
32+
var byteInt64 []byte = make([]byte, 8)
33+
34+
// Filter is a time decaying filter with a TTL per entry.
35+
type Filter struct {
36+
m map[uint64]int64
37+
q *list.List
38+
ttl time.Duration
39+
}
40+
41+
// Entry encodes a hash value with the time it was first added to the filter.
42+
type Entry struct {
43+
h uint64
44+
firstSeen time.Time
45+
}
46+
47+
// NewFilter creates a new time decaying filter that evicts entries that have been seen longer than t duration.
48+
func NewFilter(t time.Duration) *Filter {
49+
return &Filter{m: make(map[uint64]int64), q: list.New(), ttl: t}
50+
}
51+
52+
// Test tests if hash h has been seen since maximum ttl.
53+
func (f *Filter) Test(h uint64) bool {
54+
f.evictAgedEntries()
55+
_, ok := f.m[h]
56+
return ok
57+
}
58+
59+
// TestAndAdd tests if hash h has been seen since maximum ttl and adds or increments the element in the filter cache.
60+
func (f *Filter) TestAndAdd(h uint64) bool {
61+
f.evictAgedEntries()
62+
_, ok := f.m[h]
63+
f.Add(h)
64+
return ok
65+
}
66+
67+
// Contains returns how many times hash h has been seen during its ttl time.
68+
func (f *Filter) Count(h uint64) int64 {
69+
f.evictAgedEntries()
70+
if count, ok := f.m[h]; ok {
71+
return count
72+
}
73+
return 0
74+
}
75+
76+
// Add adds hash h to the filter.
77+
func (f *Filter) Add(h uint64) {
78+
if v, ok := f.m[h]; !ok {
79+
f.m[h] = 1
80+
f.q.PushBack(Entry{h: h, firstSeen: time.Now()})
81+
} else {
82+
f.m[h] = v + 1
83+
}
84+
}
85+
86+
func (f *Filter) evictAgedEntries() {
87+
for f.q.Len() > 0 {
88+
e := f.q.Front()
89+
entry := e.Value.(Entry)
90+
if time.Since(entry.firstSeen) < f.ttl {
91+
break
92+
}
93+
f.q.Remove(e)
94+
delete(f.m, entry.h)
95+
}
96+
}
97+
98+
// semanticHash computes a hash value over record attributes denoting the semantics of the record (used in the time decay filter).
99+
func semanticHash(fr *sfgo.FlatRecord) uint64 {
100+
h := xxhash.New()
101+
h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.PROC_EXE_STR]))
102+
h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.PROC_EXEARGS_STR]))
103+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_UID_INT]))
104+
h.Write(byteInt64)
105+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_GID_INT]))
106+
h.Write(byteInt64)
107+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.OPFLAGS_INT]))
108+
h.Write(byteInt64)
109+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_TTY_INT]))
110+
h.Write(byteInt64)
111+
sfType := fr.Ints[sfgo.SYSFLOW_IDX][sfgo.SF_REC_TYPE]
112+
if sfType == sfgo.NET_FLOW {
113+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_SIP_INT]))
114+
h.Write(byteInt64)
115+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_DIP_INT]))
116+
h.Write(byteInt64)
117+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_DPORT_INT]))
118+
h.Write(byteInt64)
119+
binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_PROTO_INT]))
120+
h.Write(byteInt64)
121+
}
122+
if sfType == sfgo.FILE_FLOW || sfType == sfgo.FILE_EVT {
123+
h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.FILE_PATH_STR]))
124+
}
125+
return h.Sum64()
126+
}

core/flattener/flattener.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ func NewFlattenerChan(size int) interface{} {
4646

4747
// Flattener defines the main class for the flatterner plugin.
4848
type Flattener struct {
49-
outCh []chan *sfgo.FlatRecord
49+
config Config
50+
filter *Filter
51+
outCh []chan *sfgo.FlatRecord
5052
}
5153

5254
// NewFlattener creates a new Flattener instance.
@@ -66,6 +68,11 @@ func (s *Flattener) RegisterHandler(hc plugins.SFHandlerCache) {
6668

6769
// Init initializes the handler with a configuration map.
6870
func (s *Flattener) Init(conf map[string]interface{}) error {
71+
s.config, _ = CreateConfig(conf) // no err check, assuming defaults
72+
if s.config.FilterOnOff.Enabled() {
73+
s.filter = NewFilter(s.config.FilterMaxAge)
74+
logger.Info.Printf("Initialized rate limiter with %s time decay", s.config.FilterMaxAge)
75+
}
6976
return nil
7077
}
7178

@@ -81,6 +88,16 @@ func (s *Flattener) SetOutChan(chObj []interface{}) {
8188
}
8289
}
8390

91+
// out sends a record to every output channel in the plugin.
92+
func (s *Flattener) out(fr *sfgo.FlatRecord) {
93+
if s.config.FilterOnOff.Enabled() && s.filter != nil && s.filter.TestAndAdd(semanticHash(fr)) {
94+
return
95+
}
96+
for _, c := range s.outCh {
97+
c <- fr
98+
}
99+
}
100+
84101
// Cleanup tears down resources.
85102
func (s *Flattener) Cleanup() {
86103
logger.Trace.Println("Calling Cleanup on Flattener channel")
@@ -132,9 +149,7 @@ func (s *Flattener) HandleNetFlow(sf *plugins.CtxSysFlow, nf *sfgo.NetworkFlow)
132149
fr.Ints[sfgo.SYSFLOW_IDX][sfgo.FL_NETW_NUMWSENDBYTES_INT] = nf.NumWSendBytes
133150
fr.Ptree = sf.PTree
134151
fr.GraphletID = sf.GraphletID
135-
for _, ch := range s.outCh {
136-
ch <- fr
137-
}
152+
s.out(fr)
138153
return nil
139154
}
140155

@@ -155,9 +170,7 @@ func (s *Flattener) HandleFileFlow(sf *plugins.CtxSysFlow, ff *sfgo.FileFlow) er
155170
fr.Ints[sfgo.SYSFLOW_IDX][sfgo.FL_FILE_NUMWSENDBYTES_INT] = ff.NumWSendBytes
156171
fr.Ptree = sf.PTree
157172
fr.GraphletID = sf.GraphletID
158-
for _, ch := range s.outCh {
159-
ch <- fr
160-
}
173+
s.out(fr)
161174
return nil
162175
}
163176

@@ -191,9 +204,7 @@ func (s *Flattener) HandleFileEvt(sf *plugins.CtxSysFlow, fe *sfgo.FileEvent) er
191204
fr.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_FILE_RET_INT] = int64(fe.Ret)
192205
fr.Ptree = sf.PTree
193206
fr.GraphletID = sf.GraphletID
194-
for _, ch := range s.outCh {
195-
ch <- fr
196-
}
207+
s.out(fr)
197208
return nil
198209
}
199210

@@ -218,9 +229,7 @@ func (s *Flattener) HandleProcEvt(sf *plugins.CtxSysFlow, pe *sfgo.ProcessEvent)
218229
fr.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_PROC_RET_INT] = int64(pe.Ret)
219230
fr.Ptree = sf.PTree
220231
fr.GraphletID = sf.GraphletID
221-
for _, ch := range s.outCh {
222-
ch <- fr
223-
}
232+
s.out(fr)
224233
return nil
225234
}
226235

docs/CONFIG.md

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
## Configuration
22

3-
The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a `yaml` file. An example pipeline with this configuration looks as follows:
3+
The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a `yaml` file. An example pipeline with this configuration looks as follows:
44

55
```json
66
{
@@ -38,7 +38,7 @@ The pipeline configuration below shows how to configure a pipeline that will rea
3838
This pipeline specifies three built-in plugins:
3939

4040
- [sysflowreader](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/processor/processor.go): is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the [handler interface](https://github.com/sysflow-telemetry/sf-apis/blob/master/go/plugins/handler.go)) for processing. In this case, we are using the [flattener](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/flattener/flattener.go) handler, but custom handlers are possible.
41-
- [policyengine](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/policyengine/policyengine.go): is the policy engine, which takes [flattened](https://github.com/sysflow-telemetry/sf-apis/blob/master/go/sfgo/flatrecord.go) (row-oriented) SysFlow records as input and outputs [records](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/policyengine/engine/types.go), which represent alerts, or filtered sysflow records depending on the policy engine's _mode_ (more on this later).
41+
- [policyengine](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/policyengine/policyengine.go): is the policy engine, which takes [flattened](https://github.com/sysflow-telemetry/sf-apis/blob/master/go/sfgo/flatrecord.go) (row-oriented) SysFlow records as input and outputs [records](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/policyengine/engine/types.go), which represent alerts, or filtered sysflow records depending on the policy engine's _mode_ (more on this later).
4242
- [exporter](https://github.com/sysflow-telemetry/sf-processor/blob/master/core/exporter/exporter.go): takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.
4343

4444
Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration [template](https://github.com/sysflow-telemetry/sf-processor/blob/master/driver/pipeline.template.json)
@@ -65,7 +65,7 @@ The policy engine (`"processor": "policyengine"`) plugin is driven by a set of r
6565
- _mode_ (optional): The mode of the policy engine. Allowed values are:
6666
- `alert` (default): the policy engine generates rule-based alerts; `alert` is a blocking mode that drops all records that do not match any given rule. If no mode is specified, the policy engine runs in `alert` mode by default.
6767
- `enrich` for enriching records with additional context from the rule. In contrast to `alert`, this is a non-blocking mode which applies tagging and action enrichments to matching records as defined in the policy file. Non-matching records are passed on "as is".
68-
68+
6969
- _monitor_ (optional): Specifies if changes to the policy file(s) should be monitored and updated in the policy engine.
7070
- `none` (default): no monitor is used.
7171
- `local`: the processor will monitor for changes in the policies path and update its rule set if changes are detected.
@@ -115,7 +115,7 @@ Data export is done via bulk ingestion. The ingestion can be controlled by some
115115
- _es.username_ (required): The ES username.
116116
- _es.password_ (required): The password for the specified ES user.
117117
- _buffer_ (optional) The bulk size as the number of records to be ingested at once. Default is `0` but value of `0` indicates record-by-record ingestion which may be highly inefficient.
118-
- _es.bulk.numWorkers_ (optional): The number of ingestion workers used in parallel. Default is `0` which means that the exporter uses as many workers as there are cores in the machine.
118+
- _es.bulk.numWorkers_ (optional): The number of ingestion workers used in parallel. Default is `0` which means that the exporter uses as many workers as there are cores in the machine.
119119
- _es.bulk.flashBuffer_ (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in _buffer_), otherwise the bulk is broken into smaller chunks. Default is `5e+6`.
120120
- _es.bulk.flushTimeout_ (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is `30s`.
121121

@@ -134,7 +134,7 @@ Export to IBM Findings API allows adding custom findings to the IBM Cloud Securi
134134
- _findings.sqlquerycrn_ (required):
135135
- _findings.s3region_ (required):
136136
- _findings.s3bucket_ (required):
137-
- _findings.path_ (required):
137+
- _findings.path_ (required):
138138
- _findings.pool.capacity_ (optional): The capacity of the findings pool, Default is `250`.
139139
- _findings.pool.maxage_ (woptional): The maximum age of the security findings in the pool in minutes. Default is `1440`.
140140

@@ -145,7 +145,7 @@ For more information about inserting custom findings into IBM SCC, refer to [Cus
145145
It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:
146146

147147
- Environment variables must follow the naming schema `<PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>`
148-
- The plugin name inside the pipeline configuration file must be all lower case.
148+
- The plugin name inside the pipeline configuration file must be all lower case.
149149

150150
For example, to set the alert mode inside the policy engine, the following environment variable is set:
151151

@@ -174,3 +174,18 @@ docker run
174174
-e EXPORTER_PORT=514
175175
...
176176
```
177+
178+
### Rate limiter configuration (experimental)
179+
180+
The `flattener` handler has a built-in time decay filter that can be enabled to reduce even rates in the processor. The filter uses a time-decay bloom filter based on a semantic hashing of records. This means that the filter should only forward one record matching a semantic hash per time decay period. The semantic hash takes into consideration process, flow and event attributes. To enable rate limiting, modify the `sysflowreader` processor as follows:
181+
182+
```json
183+
{
184+
"processor": "sysflowreader",
185+
"handler": "flattener",
186+
"in": "sysflow sysflowchan",
187+
"out": "flat flattenerchan",
188+
"filter.enabled": "on|off (default: off)",
189+
"filter.maxage": "time decay in minutes (default: 24H)"
190+
}
191+
```

0 commit comments

Comments
 (0)