Skip to content

Commit 14fc194

Browse files
Dávid Szakállasdszakallas
Dávid Szakállas
authored andcommitted
Proof of concept
1 parent 01c675a commit 14fc194

File tree

2 files changed

+163
-1
lines changed

2 files changed

+163
-1
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
name="sumologic-sdk",
55
version="0.1.7",
66
packages=find_packages(),
7-
install_requires=['requests>=2.2.1'],
7+
install_requires=['requests>=2.2.1', 'aiohttp>=2.3.1'],
88
# PyPI metadata
99
author="Yoway Buorn, Melchi Salins",
1010

sumologic/async.py

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from copy import copy
2+
import json
3+
import logging
4+
import asyncio
5+
import aiohttp
6+
7+
class SumoLogic(object):
8+
9+
def __init__(self, accessId, accessKey, endpoint=None, cookieFile='cookies.txt'):
10+
self.session = aiohttp.ClientSession(auth=aiohttp.BasicAuth(accessId, accessKey),
11+
read_timeout=None,
12+
headers={'content-type': 'application/json', 'accept': 'application/json'})
13+
14+
self.endpoint = None
15+
16+
async def _guard_endpoint(self):
17+
"""
18+
SumoLogic REST API endpoint changes based on the geo location of the client.
19+
For example, If the client geolocation is Australia then the REST end point is
20+
https://api.au.sumologic.com/api/v1
21+
22+
When the default REST endpoint (https://api.sumologic.com/api/v1) is used the server
23+
responds with a 401 and causes the SumoLogic class instantiation to fail and this very
24+
unhelpful message is shown 'Full authentication is required to access this resource'
25+
26+
This method makes a request to the default REST endpoint and resolves the 401 to learn
27+
the right endpoint
28+
"""
29+
self.endpoint = 'https://api.sumologic.com/api/v1'
30+
response = await self.session.get('https://api.sumologic.com/api/v1/collectors') # Dummy call to get endpoint
31+
self.endpoint = str(response.url).replace('/collectors', '') # dirty hack to sanitise URI and retain domain
32+
33+
async def delete(self, method, params=None):
34+
await self._guard_endpoint()
35+
r = await self.session.delete(self.endpoint + method, params=params)
36+
r.raise_for_status()
37+
return r
38+
39+
async def get(self, method, params=None):
40+
await self._guard_endpoint()
41+
r = await self.session.get(self.endpoint + method, params=params)
42+
if 400 <= r.status < 600:
43+
r.reason = await r.text()
44+
r.raise_for_status()
45+
return r
46+
47+
async def post(self, method, params, headers=None):
48+
await self._guard_endpoint()
49+
r = await self.session.post(self.endpoint + method, data=json.dumps(params), headers=headers)
50+
r.raise_for_status()
51+
return r
52+
53+
async def put(self, method, params, headers=None):
54+
await self._guard_endpoint()
55+
r = await self.session.put(self.endpoint + method, data=json.dumps(params), headers=headers)
56+
r.raise_for_status()
57+
return r
58+
59+
async def search(self, query, fromTime=None, toTime=None, timeZone='UTC'):
60+
params = {'q': query, 'from': fromTime, 'to': toTime, 'tz': timeZone}
61+
r = await self.get('/logs/search', params)
62+
return json.loads(await r.text())
63+
64+
async def search_job(self, query, fromTime=None, toTime=None, timeZone='UTC'):
65+
params = {'query': query, 'from': fromTime, 'to': toTime, 'timeZone': timeZone}
66+
r = await self.post('/search/jobs', params)
67+
return json.loads(await r.text())
68+
69+
async def search_job_status(self, search_job):
70+
r = await self.get('/search/jobs/' + str(search_job['id']))
71+
return json.loads(await r.text())
72+
73+
async def search_job_messages(self, search_job, limit=None, offset=0):
74+
params = {'limit': limit, 'offset': offset}
75+
r = await self.get('/search/jobs/' + str(search_job['id']) + '/messages', params)
76+
return json.loads(await r.text())
77+
78+
async def search_job_records(self, search_job, limit=None, offset=0):
79+
params = {'limit': limit, 'offset': offset}
80+
r = await self.get('/search/jobs/' + str(search_job['id']) + '/records', params)
81+
return json.loads(await r.text())
82+
83+
async def delete_search_job(self, search_job):
84+
return await self.delete('/search/jobs/' + str(search_job['id']))
85+
86+
async def collectors(self, limit=None, offset=None):
87+
params = {'limit': limit, 'offset': offset}
88+
r = await self.get('/collectors', params)
89+
return json.loads(await r.text())['collectors']
90+
91+
async def collector(self, collector_id):
92+
r = await self.get('/collectors/' + str(collector_id))
93+
return json.loads(await r.text()), r.headers['etag']
94+
95+
async def update_collector(self, collector, etag):
96+
headers = {'If-Match': etag}
97+
return await self.put('/collectors/' + str(collector['collector']['id']), collector, headers)
98+
99+
async def delete_collector(self, collector):
100+
return await self.delete('/collectors/' + str(collector['id']))
101+
102+
async def sources(self, collector_id, limit=None, offset=None):
103+
params = {'limit': limit, 'offset': offset}
104+
r = await self.get('/collectors/' + str(collector_id) + '/sources', params)
105+
return json.loads(await r.text())['sources']
106+
107+
async def source(self, collector_id, source_id):
108+
r = await self.get('/collectors/' + str(collector_id) + '/sources/' + str(source_id))
109+
return json.loads(await r.text()), r.headers['etag']
110+
111+
async def create_source(self, collector_id, source):
112+
return await self.post('/collectors/' + str(collector_id) + '/sources', source)
113+
114+
async def update_source(self, collector_id, source, etag):
115+
headers = {'If-Match': etag}
116+
return await self.put('/collectors/' + str(collector_id) + '/sources/' + str(source['source']['id']), source, headers)
117+
118+
async def delete_source(self, collector_id, source):
119+
return await self.delete('/collectors/' + str(collector_id) + '/sources/' + str(source['source']['id']))
120+
121+
async def create_content(self, path, data):
122+
r = await self.post('/content/' + path, data)
123+
return await r.text()
124+
125+
async def get_content(self, path):
126+
r = await self.get('/content/' + path)
127+
return json.loads(await r.text())
128+
129+
async def delete_content(self):
130+
r = await self.delete('/content/' + path)
131+
return json.loads(await r.text())
132+
133+
async def dashboards(self, monitors=False):
134+
params = {'monitors': monitors}
135+
r = await self.get('/dashboards', params)
136+
return json.loads(await r.text())['dashboards']
137+
138+
async def dashboard(self, dashboard_id):
139+
r = await self.get('/dashboards/' + str(dashboard_id))
140+
return json.loads(await r.text())['dashboard']
141+
142+
async def dashboard_data(self, dashboard_id):
143+
r = await self.get('/dashboards/' + str(dashboard_id) + '/data')
144+
return json.loads(await r.text())['dashboardMonitorDatas']
145+
146+
async def search_metrics(self, query, fromTime=None, toTime=None, requestedDataPoints=600, maxDataPoints=800):
147+
'''Perform a single Sumo metrics query'''
148+
def millisectimestamp(ts):
149+
'''Convert UNIX timestamp to milliseconds'''
150+
if ts > 10**12:
151+
ts = ts/(10**(len(str(ts))-13))
152+
else:
153+
ts = ts*10**(12-len(str(ts)))
154+
return int(ts)
155+
156+
params = {'query': [{"query":query, "rowId":"A"}],
157+
'startTime': millisectimestamp(fromTime),
158+
'endTime': millisectimestamp(toTime),
159+
'requestedDataPoints': requestedDataPoints,
160+
'maxDataPoints': maxDataPoints}
161+
r = await self.post('/metrics/results', params)
162+
return json.loads(await r.text())

0 commit comments

Comments
 (0)