Skip to content

Commit 528118b

Browse files
authored
Merge pull request #441 from splunk/json-resultreader
JSONResultsReader added and deprecated ResultsReader
2 parents 4a0e670 + 55d4603 commit 528118b

File tree

12 files changed

+151
-60
lines changed

12 files changed

+151
-60
lines changed

examples/follow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def follow(job, count, items):
4242
job.refresh()
4343
continue
4444
stream = items(offset+1)
45-
for event in results.ResultsReader(stream):
45+
for event in results.JSONResultsReader(stream):
4646
pprint(event)
4747
offset = total
4848

@@ -72,10 +72,10 @@ def main():
7272

7373
if job['reportSearch'] is not None: # Is it a transforming search?
7474
count = lambda: int(job['numPreviews'])
75-
items = lambda _: job.preview()
75+
items = lambda _: job.preview(output_mode='json')
7676
else:
7777
count = lambda: int(job['eventCount'])
78-
items = lambda offset: job.events(offset=offset)
78+
items = lambda offset: job.events(offset=offset, output_mode='json')
7979

8080
try:
8181
follow(job, count, items)

examples/oneshot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"(e.g., export PYTHONPATH=~/splunk-sdk-python.")
3333

3434
def pretty(response):
35-
reader = results.ResultsReader(response)
35+
reader = results.JSONResultsReader(response)
3636
for result in reader:
3737
if isinstance(result, dict):
3838
pprint(result)
@@ -46,7 +46,7 @@ def main():
4646
search = opts.args[0]
4747
service = connect(**opts.kwargs)
4848
socket.setdefaulttimeout(None)
49-
response = service.jobs.oneshot(search)
49+
response = service.jobs.oneshot(search, output_mode='json')
5050

5151
pretty(response)
5252

examples/results.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717
"""A script that reads XML search results from stdin and pretty-prints them
1818
back to stdout. The script is designed to be used with the search.py
1919
example, eg: './search.py "search 404" | ./results.py'"""
20-
20+
2121
from __future__ import absolute_import
2222
from pprint import pprint
2323
import sys, os
24+
2425
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
2526

2627
import splunklib.results as results
2728

29+
2830
def pretty():
29-
reader = results.ResultsReader(sys.stdin)
31+
reader = results.JSONResultsReader(sys.stdin)
3032
for event in reader:
3133
pprint(event)
3234

35+
3336
if __name__ == "__main__":
3437
pretty()

examples/search_modes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def modes(argv):
2424
while not job.is_ready():
2525
time.sleep(0.5)
2626
pass
27-
reader = results.ResultsReader(job.events())
27+
reader = results.JSONResultsReader(job.events(output_mode='json'))
2828
# Events found: 0
2929
print('Events found with adhoc_search_level="smart": %s' % len([e for e in reader]))
3030

@@ -33,7 +33,7 @@ def modes(argv):
3333
while not job.is_ready():
3434
time.sleep(0.5)
3535
pass
36-
reader = results.ResultsReader(job.events())
36+
reader = results.JSONResultsReader(job.events(output_mode='json'))
3737
# Events found: 10
3838
print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader]))
3939

examples/stail.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pprint import pprint
2626

2727
from splunklib.client import connect
28-
from splunklib.results import ResultsReader
28+
from splunklib.results import JSONResultsReader
2929

3030
try:
3131
import utils
@@ -49,9 +49,10 @@ def main():
4949
search=search,
5050
earliest_time="rt",
5151
latest_time="rt",
52-
search_mode="realtime")
52+
search_mode="realtime",
53+
output_mode="json")
5354

54-
for result in ResultsReader(result.body):
55+
for result in JSONResultsReader(result.body):
5556
if result is not None:
5657
print(pprint(result))
5758

splunklib/client.py

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2767,9 +2767,8 @@ def pause(self):
27672767
return self
27682768

27692769
def results(self, **query_params):
2770-
"""Returns a streaming handle to this job's search results. To get a
2771-
nice, Pythonic iterator, pass the handle to :class:`splunklib.results.ResultsReader`,
2772-
as in::
2770+
"""Returns a streaming handle to this job's search results. To get a nice, Pythonic iterator, pass the handle
2771+
to :class:`splunklib.results.JSONResultsReader` along with the query param "output_mode='json'", as in::
27732772
27742773
import splunklib.client as client
27752774
import splunklib.results as results
@@ -2778,7 +2777,7 @@ def results(self, **query_params):
27782777
job = service.jobs.create("search * | head 5")
27792778
while not job.is_done():
27802779
sleep(.2)
2781-
rr = results.ResultsReader(job.results())
2780+
rr = results.JSONResultsReader(job.results(output_mode='json'))
27822781
for result in rr:
27832782
if isinstance(result, results.Message):
27842783
# Diagnostic messages may be returned in the results
@@ -2808,19 +2807,17 @@ def results(self, **query_params):
28082807
def preview(self, **query_params):
28092808
"""Returns a streaming handle to this job's preview search results.
28102809
2811-
Unlike :class:`splunklib.results.ResultsReader`, which requires a job to
2812-
be finished to
2813-
return any results, the ``preview`` method returns any results that have
2814-
been generated so far, whether the job is running or not. The
2815-
returned search results are the raw data from the server. Pass
2816-
the handle returned to :class:`splunklib.results.ResultsReader` to get a
2817-
nice, Pythonic iterator over objects, as in::
2810+
Unlike :class:`splunklib.results.JSONResultsReader`along with the query param "output_mode='json'",
2811+
which requires a job to be finished to return any results, the ``preview`` method returns any results that
2812+
have been generated so far, whether the job is running or not. The returned search results are the raw data
2813+
from the server. Pass the handle returned to :class:`splunklib.results.JSONResultsReader` to get a nice,
2814+
Pythonic iterator over objects, as in::
28182815
28192816
import splunklib.client as client
28202817
import splunklib.results as results
28212818
service = client.connect(...)
28222819
job = service.jobs.create("search * | head 5")
2823-
rr = results.ResultsReader(job.preview())
2820+
rr = results.JSONResultsReader(job.preview(output_mode='json'))
28242821
for result in rr:
28252822
if isinstance(result, results.Message):
28262823
# Diagnostic messages may be returned in the results
@@ -2975,15 +2972,15 @@ def create(self, query, **kwargs):
29752972
return Job(self.service, sid)
29762973

29772974
def export(self, query, **params):
2978-
"""Runs a search and immediately starts streaming preview events.
2979-
This method returns a streaming handle to this job's events as an XML
2980-
document from the server. To parse this stream into usable Python objects,
2981-
pass the handle to :class:`splunklib.results.ResultsReader`::
2975+
"""Runs a search and immediately starts streaming preview events. This method returns a streaming handle to
2976+
this job's events as an XML document from the server. To parse this stream into usable Python objects,
2977+
pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param
2978+
"output_mode='json'"::
29822979
29832980
import splunklib.client as client
29842981
import splunklib.results as results
29852982
service = client.connect(...)
2986-
rr = results.ResultsReader(service.jobs.export("search * | head 5"))
2983+
rr = results.JSONResultsReader(service.jobs.export("search * | head 5",output_mode='json'))
29872984
for result in rr:
29882985
if isinstance(result, results.Message):
29892986
# Diagnostic messages may be returned in the results
@@ -3032,14 +3029,14 @@ def itemmeta(self):
30323029
def oneshot(self, query, **params):
30333030
"""Run a oneshot search and returns a streaming handle to the results.
30343031
3035-
The ``InputStream`` object streams XML fragments from the server. To
3036-
parse this stream into usable Python objects,
3037-
pass the handle to :class:`splunklib.results.ResultsReader`::
3032+
The ``InputStream`` object streams XML fragments from the server. To parse this stream into usable Python
3033+
objects, pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param
3034+
"output_mode='json'" ::
30383035
30393036
import splunklib.client as client
30403037
import splunklib.results as results
30413038
service = client.connect(...)
3042-
rr = results.ResultsReader(service.jobs.oneshot("search * | head 5"))
3039+
rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5",output_mode='json'))
30433040
for result in rr:
30443041
if isinstance(result, results.Message):
30453042
# Diagnostic messages may be returned in the results

splunklib/results.py

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,19 @@
3434

3535
from __future__ import absolute_import
3636

37-
from io import BytesIO
37+
from io import BufferedReader, BytesIO
3838

3939
from splunklib import six
40+
41+
from splunklib.six import deprecated
42+
4043
try:
4144
import xml.etree.cElementTree as et
4245
except:
4346
import xml.etree.ElementTree as et
4447

4548
from collections import OrderedDict
49+
from json import loads as json_loads
4650

4751
try:
4852
from splunklib.six.moves import cStringIO as StringIO
@@ -54,6 +58,7 @@
5458
"Message"
5559
]
5660

61+
5762
class Message(object):
5863
"""This class represents informational messages that Splunk interleaves in the results stream.
5964
@@ -64,6 +69,7 @@ class Message(object):
6469
6570
m = Message("DEBUG", "There's something in that variable...")
6671
"""
72+
6773
def __init__(self, type_, message):
6874
self.type = type_
6975
self.message = message
@@ -77,6 +83,7 @@ def __eq__(self, other):
7783
def __hash__(self):
7884
return hash((self.type, self.message))
7985

86+
8087
class _ConcatenatedStream(object):
8188
"""Lazily concatenate zero or more streams into a stream.
8289
@@ -89,6 +96,7 @@ class _ConcatenatedStream(object):
8996
s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
9097
assert s.read() == "abcdef"
9198
"""
99+
92100
def __init__(self, *streams):
93101
self.streams = list(streams)
94102

@@ -107,6 +115,7 @@ def read(self, n=None):
107115
del self.streams[0]
108116
return response
109117

118+
110119
class _XMLDTDFilter(object):
111120
"""Lazily remove all XML DTDs from a stream.
112121
@@ -120,6 +129,7 @@ class _XMLDTDFilter(object):
120129
s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
121130
assert s.read() == "<element></element>"
122131
"""
132+
123133
def __init__(self, stream):
124134
self.stream = stream
125135

@@ -150,6 +160,8 @@ def read(self, n=None):
150160
n -= 1
151161
return response
152162

163+
164+
@deprecated("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'")
153165
class ResultsReader(object):
154166
"""This class returns dictionaries and Splunk messages from an XML results
155167
stream.
@@ -177,6 +189,7 @@ class ResultsReader(object):
177189
print "Message: %s" % result
178190
print "is_preview = %s " % reader.is_preview
179191
"""
192+
180193
# Be sure to update the docstrings of client.Jobs.oneshot,
181194
# client.Job.results_preview and client.Job.results to match any
182195
# changes made to ResultsReader.
@@ -257,16 +270,16 @@ def _parse_results(self, stream):
257270
# So we'll define it here
258271

259272
def __itertext(self):
260-
tag = self.tag
261-
if not isinstance(tag, six.string_types) and tag is not None:
262-
return
263-
if self.text:
264-
yield self.text
265-
for e in self:
266-
for s in __itertext(e):
267-
yield s
268-
if e.tail:
269-
yield e.tail
273+
tag = self.tag
274+
if not isinstance(tag, six.string_types) and tag is not None:
275+
return
276+
if self.text:
277+
yield self.text
278+
for e in self:
279+
for s in __itertext(e):
280+
yield s
281+
if e.tail:
282+
yield e.tail
270283

271284
text = "".join(__itertext(elem))
272285
values.append(text)
@@ -288,5 +301,69 @@ def __itertext(self):
288301
raise
289302

290303

304+
class JSONResultsReader(object):
305+
"""This class returns dictionaries and Splunk messages from a JSON results
306+
stream.
307+
``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a
308+
:class:`Message` object for Splunk messages. This class has one field,
309+
``is_preview``, which is ``True`` when the results are a preview from a
310+
running search, or ``False`` when the results are from a completed search.
311+
This function has no network activity other than what is implicit in the
312+
stream it operates on.
313+
:param `stream`: The stream to read from (any object that supports
314+
``.read()``).
315+
**Example**::
316+
import results
317+
response = ... # the body of an HTTP response
318+
reader = results.JSONResultsReader(response)
319+
for result in reader:
320+
if isinstance(result, dict):
321+
print "Result: %s" % result
322+
elif isinstance(result, results.Message):
323+
print "Message: %s" % result
324+
print "is_preview = %s " % reader.is_preview
325+
"""
326+
327+
# Be sure to update the docstrings of client.Jobs.oneshot,
328+
# client.Job.results_preview and client.Job.results to match any
329+
# changes made to JSONResultsReader.
330+
#
331+
# This wouldn't be a class, just the _parse_results function below,
332+
# except that you cannot get the current generator inside the
333+
# function creating that generator. Thus it's all wrapped up for
334+
# the sake of one field.
335+
def __init__(self, stream):
336+
# The search/jobs/exports endpoint, when run with
337+
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
338+
# JSON documents, each containing a result, as opposed to one
339+
# results element containing lots of results.
340+
stream = BufferedReader(stream)
341+
self.is_preview = None
342+
self._gen = self._parse_results(stream)
343+
344+
def __iter__(self):
345+
return self
291346

347+
def next(self):
348+
return next(self._gen)
349+
350+
__next__ = next
292351

352+
def _parse_results(self, stream):
353+
"""Parse results and messages out of *stream*."""
354+
for line in stream.readlines():
355+
strip_line = line.strip()
356+
if strip_line.__len__() == 0: continue
357+
parsed_line = json_loads(strip_line)
358+
if "preview" in parsed_line:
359+
self.is_preview = parsed_line["preview"]
360+
if "messages" in parsed_line and parsed_line["messages"].__len__() > 0:
361+
for message in parsed_line["messages"]:
362+
msg_type = message.get("type", "Unknown Message Type")
363+
text = message.get("text")
364+
yield Message(msg_type, text)
365+
if "result" in parsed_line:
366+
yield parsed_line["result"]
367+
if "results" in parsed_line:
368+
for result in parsed_line["results"]:
369+
yield result

splunklib/six.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,3 +978,16 @@ def python_2_unicode_compatible(klass):
978978
del i, importer
979979
# Finally, add the importer to the meta path import hook.
980980
sys.meta_path.append(_importer)
981+
982+
import warnings
983+
984+
def deprecated(message):
985+
def deprecated_decorator(func):
986+
def deprecated_func(*args, **kwargs):
987+
warnings.warn("{} is a deprecated function. {}".format(func.__name__, message),
988+
category=DeprecationWarning,
989+
stacklevel=2)
990+
warnings.simplefilter('default', DeprecationWarning)
991+
return func(*args, **kwargs)
992+
return deprecated_func
993+
return deprecated_decorator

0 commit comments

Comments
 (0)