Skip to content

Commit 4f9f0e2

Browse files
committed
toy implementation of a DS
1 parent e059a87 commit 4f9f0e2

File tree

3 files changed

+1234
-0
lines changed

3 files changed

+1234
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
#!/usr/bin/env python3
2+
# THIS IS A TOY IMPLEMENTATION OF THE DEPLOYMENT SERVER
3+
# IT ONLY PROCESSES WHITELISTS.
4+
# THERE NO MACHINE TYPES / BLACKLIST SUPPORT
5+
#
6+
# generate large serverclass.conf from resources/conf/gendsconf
7+
#
8+
# test performance with load.py
9+
#
10+
# performance bottleneck is regex at scale.
11+
# i.e. for a given client, which serverclasses should i include.
12+
# if you have 1000 serverclasses with 1000 regexes each,
13+
# thats 1M*num_properties regexes PER client
14+
#
15+
# the 1st time we see a client, and need to calculate with serverclasses are eligible
16+
# this implementation is ~ 10x-40x faster dependant on serverclasses count
17+
# subsequent connections from a known client are about 150x-1000x faster due to caching
18+
#
19+
# To go further,
20+
# 1. DONE have a routine for generating 'bundles' from the apps, storing checksums + whether it causes a restart
21+
# 2. DONE include these details in the serverclasses dict
22+
# 3. HALFBAKED use a real webserver to handle /deployemnt/streams (the downloads)
23+
# 4. implement cache invalidation if the serverclasses / apps change
24+
# 5. implement cache removal for agent checksums not seen in a long time and/or
25+
# if the properties change for a UUID
26+
# 6. don't use python. Use something that can use more than 1 CPU.
27+
# 7. HALFBAKED figure out what the IP is in the real connect string.
28+
# are we looking up the ip of the other end of the connection,
29+
# or using x-forwarded-for ?
30+
# This IP is used for matching
31+
# 8. DONE Figure out what the xml properties are in the phonehome responses (responses.py)
32+
33+
from aiohttp import web
34+
import hashlib, json, re, configparser, os, tarfile, gzip, socket, logging
35+
from responses import format_getapps_response, format_handshake_response
36+
from io import BytesIO
37+
38+
logging.basicConfig(level=logging.INFO)
39+
40+
connect_properties = ["client_id","dns_name","build","os","port","version","uuid","type","hostname"]
41+
check_properties = ["client_id","hostname","dns_name"]
42+
apps_dir = '/opt/splunk/etc/deployment-apps'
43+
bundle_dir = './var/run/tmp'
44+
45+
cache = {}
46+
clients = {}
47+
apps = {}
48+
49+
async def handle_d_cl(r): return web.Response(text=json.dumps(clients,indent=2))
50+
async def handle_d_sc(r): return web.Response(text=json.dumps(serverclasses,default=lambda o: o.pattern, indent=2)) # hack to display compiled regex
51+
async def handle_d_ap(r): return web.Response(text=json.dumps(apps,indent=2))
52+
53+
def get_serverclasses():
54+
config = configparser.ConfigParser()
55+
config.read('serverclass.conf')
56+
57+
serverclasses = {}
58+
for section in config.sections():
59+
sp = section.split(":")
60+
if len(sp) == 2 and sp[0] == "serverClass":
61+
whitelists = sorted([ f"(?:{v.replace('*','.*')})" for (k,v) in config.items(section) if k.startswith('whitelist.') ])
62+
serverclasses[sp[1]] = { "apps": [], "whitelists": whitelists }
63+
# The optimization here is to compile a single regex for the entire serverclass
64+
# compiling a regex from a string every time is slow.
65+
serverclasses[sp[1]]["whitelist"] = re.compile("|".join(whitelists))
66+
# We could do better than this.
67+
# For the static entries, compile a trie - matching that will be extremely fast.
68+
# then fallback to a regex engine for the remaining whitelist entries with wildcards.
69+
70+
71+
for section in config.sections():
72+
sp = section.split(":")
73+
if len(sp) == 4 and sp[0] == "serverClass" and sp[2] == "app":
74+
serverclasses[sp[1]]["apps"].append(sp[3])
75+
76+
return serverclasses
77+
78+
def should_include_serverclass(sc,cksum):
79+
props = list(set([ clients[cksum][p] for p in check_properties ]))
80+
for prop in props:
81+
if serverclasses[sc]["whitelist"].match(prop): # match against the precompiled serverclass whitelist regex
82+
return True
83+
return False
84+
85+
def phonehome_topic(connection):
86+
# for a given connect string, construct an object with matching scs and the apps.
87+
payload = {}
88+
89+
for sc, obj in serverclasses.items(): # iterate over all serverclasses
90+
include = cache.get((connection,sc),None) # have we checked this client agaist the sc before ?
91+
92+
if include == None: # Nope - populate the cache
93+
include = should_include_serverclass(sc,connection)
94+
cache[(connection,sc)] = include
95+
96+
if include == True: # this sc matches the client
97+
payload[sc] = {}
98+
for a in obj["apps"]: # grab all the apps in the serverclass
99+
if a in apps: # check this app exists in the apps we've created bundles for
100+
payload[sc][a] = {"checksum": apps[a]["cksum"], "restart": False}
101+
# if you want to do whitelisting at the app level too
102+
# this is where you would do it.
103+
# you need a new cache and 'should_include_app' function
104+
105+
return format_getapps_response(clients[connection]["hostname"],clients[connection]["client_id"],payload)
106+
107+
async def handle_connect(r):
108+
parts = r.path.split('/')[4:]
109+
remote_ip = r.remote
110+
111+
try:
112+
remote_host = str(socket.gethostbyaddr(remote_ip)[0])
113+
except:
114+
remote_host = remote_ip
115+
116+
properties = dict(map(lambda i,j : (i,j) , connect_properties, parts)) # create a dict with keys defined in connect_properties
117+
# This is the connection string sent to the client, and will be part of every request the client sends from now on.
118+
conn=f"connnection_{remote_ip}_{properties['port']}_{remote_host}_{properties['hostname']}_{properties['client_id']}"
119+
120+
# cache the client properties using the conenction string as a a key
121+
clients[conn] = properties
122+
logging.info(f"connection: remote={remote_host} connection_string={conn}")
123+
resp = web.Response(text=f'<?xml version="1.0" encoding="UTF-8"?><msg status="ok">{conn}</msg>')
124+
resp.headers['Content-Type'] = 'text/xml; charset=UTF-8'
125+
return resp
126+
127+
async def handle_phonehome(r):
128+
# an existing connection string should exist in the cache, we should check for this.
129+
body = await r.read()
130+
body = body.decode()
131+
conn = r.path.split('/')[4]
132+
133+
if not conn in clients:
134+
text = '<messages status="not_connected"/>'
135+
action = "not_connected"
136+
elif '<publish channel="deploymentServer' in body:
137+
text = phonehome_topic(conn)
138+
action = "get_apps"
139+
elif '<publish channel="tenantService/handshake' in body:
140+
text = format_handshake_response(clients[conn]["hostname"],clients[conn]["client_id"])
141+
action = "handshake"
142+
elif '<messages/>' in body:
143+
text = '<messages status="ok"/>'
144+
action = "ping"
145+
else:
146+
text = '<messages status="ok"/>'
147+
action = "unknown"
148+
149+
logging.info(f"phonehome: connection={conn} action={action}")
150+
resp = web.Response(text=text)
151+
resp.headers['Content-Type'] = 'text/xml; charset=UTF-8'
152+
return resp
153+
154+
async def handle_subscribe(r):
155+
conn = r.path.split('/')[5]
156+
157+
if not conn in clients:
158+
text = '<?xml version="1.0" encoding="UTF-8"?><msg status="not_connected" reason="not_connected"/>'
159+
action = "not_connected"
160+
elif 'tenantService' in r.path:
161+
text = '<?xml version="1.0" encoding="UTF-8"?><msg status="ok"/>'
162+
action = "tenantService"
163+
elif 'deploymentServer' in r.path:
164+
text = '<?xml version="1.0" encoding="UTF-8"?><msg status="ok"/>'
165+
action = "deploymentServer"
166+
else:
167+
text = '<?xml version="1.0" encoding="UTF-8"?><msg status="not_connected" reason="not_connected"/>'
168+
action = "unknown"
169+
170+
logging.info(f"subscribe: connection={conn} action={action}")
171+
resp = web.Response(text=text)
172+
resp.headers['Content-Type'] = 'text/xml; charset=UTF-8'
173+
return resp
174+
175+
async def handle_stream(r):
176+
name = r.rel_url.query.get('name',None)
177+
await r.read()
178+
if name is None:
179+
return web.Response(text='')
180+
181+
# we should really see if the client accepts gzip, and if not, return the uncompressed bundle
182+
# is this actually async ?
183+
_, _, app = name.split(':') # tenant , serverclass, app
184+
with open(apps[app]['loc'],'rb') as cf:
185+
content=cf.read() # in the real world you'd chunk this
186+
187+
response = web.StreamResponse()
188+
response.enable_compression(force=False) # files are already compressed
189+
response.headers['Content-Type'] = 'octet-stream'
190+
response.headers['File-Name'] = f"{app}.bundle"
191+
response.headers['Content-Encoding'] = 'gzip'
192+
response.headers['Content-Length'] = str(len(content))
193+
194+
await response.prepare(r)
195+
await response.write(content)
196+
await response.write_eof()
197+
198+
return response
199+
200+
def prep_apps():
201+
202+
if not os.path.exists(bundle_dir):
203+
os.makedirs(bundle_dir)
204+
205+
# tarfiles contain the dates of the files.
206+
# the checksum of the tar file will change if any of the files
207+
# in it have their dates changed.
208+
# set the time of all the files to 0
209+
def notimes(tarinfo):
210+
tarinfo.mtime = 0
211+
return tarinfo
212+
213+
# create a tgz for each app
214+
# if you look in a real bundle created by a real ds, you get
215+
# default/app.conf
216+
# i.e. no leading path elements, and no directories, only the files
217+
for app in [ f.path for f in os.scandir(apps_dir) if f.is_dir() ]:
218+
appname = os.path.basename(app)
219+
dest = f"{bundle_dir}/{appname}.bundle.gz"
220+
221+
has_local_app = False
222+
with tarfile.open(dest,"w:gz") as tf:
223+
224+
for root, _, files in os.walk(app):
225+
for file in files:
226+
fpath=f"{root}/{file}"
227+
arcname = fpath[len(app)+1:]
228+
if arcname == 'local/app.conf':
229+
has_local_app = True
230+
tf.add(fpath,arcname=arcname,filter=notimes)
231+
232+
if not has_local_app: # create a local/app.conf if it doesnt exists
233+
tarinfo = tarfile.TarInfo(name='local/app.conf')
234+
tarinfo.uid = os.getuid()
235+
tarinfo.gid = os.getgid()
236+
tf.addfile(tarinfo=tarinfo, fileobj=BytesIO('# Autogenerated File'.encode()))
237+
238+
# now we calculate the checksum of the tarfile, not the gz
239+
# the checksum advertised by the DS is the 1st 64 bits if the digest, converted to decimal
240+
# why ? who knows. maybe to fit in a LONG
241+
with gzip.open(dest,'rb') as gf:
242+
content = gf.read()
243+
md5sum = hashlib.md5(content).hexdigest()
244+
# and now for something completely different
245+
246+
cksum = int(md5sum[0:16],16)
247+
248+
# cache the location of the gz and the checksum
249+
apps[appname] = { "loc": dest, "cksum": cksum}
250+
251+
app = web.Application()
252+
253+
app.add_routes([
254+
web.post('/services/broker/connect/{tail:.*}', handle_connect),
255+
web.post('/services/broker/phonehome/{tail:.*}', handle_phonehome),
256+
web.post('/services/broker/channel/subscribe/{tail:.*}', handle_subscribe),
257+
web.post('/services/streams/deployment',handle_stream),
258+
web.get('/debug/clients', handle_d_cl),
259+
web.get('/debug/serverclasses', handle_d_sc),
260+
web.get('/debug/apps', handle_d_ap)
261+
])
262+
263+
if __name__ == '__main__':
264+
logging.info("Prepping apps")
265+
prep_apps()
266+
global serverclasses
267+
serverclasses = get_serverclasses()
268+
web.run_app(app, host='0.0.0.0', port=8080)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import socket
2+
3+
def format_getapps_response(hostname,client_id,items={}):
4+
5+
rbit = []
6+
for sc in items:
7+
rbit.append( f'<serverClass name="{sc}">' )
8+
for app in items[sc]:
9+
cksum = items[sc][app]["checksum"]
10+
restart = "true" if items[sc][app]["restart"] else "false"
11+
rbit.append( f'<app name="{app}" checksum="{cksum}" restartSplunkd="{restart}"/>')
12+
rbit.append( f'</serverClass>' )
13+
14+
text_parts = [
15+
'<messages status="ok">',
16+
f'<message connectionId="connection_127.0.0.1_8089_{socket.gethostname()}_direct_ds_default" hostname="direct" ipAddress="127.0.0.1" connName="ds_default" channel="deploymentServer/phoneHome/default/reply/{hostname}/{client_id}">',
17+
"".join([
18+
'<?xml version="1.0" encoding="UTF-8"?>',
19+
'<deployResponse restartSplunkd="false" restartSplunkWeb="false" stateOnClient="enabled" issueReload="false" repositoryLocation="$SPLUNK_HOME/etc/apps" endpoint="$deploymentServerUri$/services/streams/deployment?name=$tenantName$:$serverClassName$:$appName$">',
20+
"".join(rbit),
21+
'</deployResponse>',
22+
]).replace("<","&lt;").replace(">","&gt;"),
23+
'</message>',
24+
'</messages>'
25+
]
26+
27+
text = "".join(text_parts)
28+
return text
29+
30+
def format_handshake_response(hostname,client_id):
31+
32+
text_parts = [
33+
'<messages status="ok">',
34+
f'<message connectionId="connection_127.0.0.1_8089_{socket.gethostname()}_direct_tenantService" hostname="direct" ipAddress="127.0.0.1" connName="tenantService" channel="tenantService/handshake/reply/{hostname}/{client_id}">',
35+
"".join([
36+
'<?xml version="1.0" encoding="UTF-8"?>',
37+
'<tenancy>',
38+
'<status>ok</status>',
39+
'<tenantId>default</tenantId>',
40+
'<phoneHomeTopic>deploymentServer/phoneHome/default</phoneHomeTopic>',
41+
'<token>default</token>',
42+
'</tenancy>'
43+
]).replace("<","&lt;").replace(">","&gt;"),
44+
'</message>',
45+
'</messages>'
46+
]
47+
48+
text = "".join(text_parts)
49+
return text

0 commit comments

Comments
 (0)