-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpeakdetect.py
More file actions
111 lines (92 loc) · 4.16 KB
/
Copy pathpeakdetect.py
File metadata and controls
111 lines (92 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/python3
import sys
from multiprocessing import Queue
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast
# From https://github.com/Valodim/python-pulseaudio
from pulseaudio.lib_pulseaudio import *
SINK_NAME = 'alsa_output.platform-soc_audio.analog-stereo' # edit to match your sink
METER_RATE = 344
MAX_SAMPLE_VALUE = 127
DISPLAY_SCALE = 2
MAX_SPACES = MAX_SAMPLE_VALUE >> DISPLAY_SCALE
class PeakMonitor(object):
def __init__(self, sink_name, rate):
self.sink_name = sink_name
self.rate = rate
# Wrap callback methods in appropriate ctypefunc instances so
# that the Pulseaudio C API can call them
self._context_notify_cb = pa_context_notify_cb_t(self.context_notify_cb)
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb)
self._stream_read_cb = pa_stream_request_cb_t(self.stream_read_cb)
# stream_read_cb() puts peak samples into this Queue instance
self._samples = Queue()
# Create the mainloop thread and set our context_notify_cb
# method to be called when there's updates relating to the
# connection to Pulseaudio
_mainloop = pa_threaded_mainloop_new()
_mainloop_api = pa_threaded_mainloop_get_api(_mainloop)
context = pa_context_new(_mainloop_api, 'peak_demo')
pa_context_set_state_callback(context, self._context_notify_cb, None)
pa_context_connect(context, None, 0, None)
pa_threaded_mainloop_start(_mainloop)
def __iter__(self):
while True:
yield self._samples.get()
def context_notify_cb(self, context, _):
state = pa_context_get_state(context)
if state == PA_CONTEXT_READY:
print ("Pulseaudio connection ready...")
# Connected to Pulseaudio. Now request that sink_info_cb
# be called with information about the available sinks.
o = pa_context_get_sink_info_list(context, self._sink_info_cb, None)
pa_operation_unref(o)
elif state == PA_CONTEXT_FAILED :
print ("Connection failed")
elif state == PA_CONTEXT_TERMINATED:
print ("Connection terminated")
def sink_info_cb(self, context, sink_info_p, _, __):
if not sink_info_p:
return
sink_info = sink_info_p.contents
print ('-'* 60)
print ('index:', sink_info.index)
print ('name:', sink_info.name)
print ('description:', sink_info.description)
if sink_info.name == self.sink_name:
# Found the sink we want to monitor for peak levels.
# Tell PA to call stream_read_cb with peak samples.
print ()
print ('setting up peak recording using', sink_info.monitor_source_name)
print ()
samplespec = pa_sample_spec()
samplespec.channels = 1
samplespec.format = PA_SAMPLE_U8
samplespec.rate = self.rate
pa_stream = pa_stream_new(context, "peak detect demo", samplespec, None)
pa_stream_set_read_callback(pa_stream,
self._stream_read_cb,
sink_info.index)
pa_stream_connect_record(pa_stream,
sink_info.monitor_source_name,
None,
PA_STREAM_PEAK_DETECT)
def stream_read_cb(self, stream, length, index_incr):
data = c_void_p()
pa_stream_peek(stream, data, c_ulong(length))
data = cast(data, POINTER(c_ubyte))
for i in xrange(length):
# When PA_SAMPLE_U8 is used, samples values range from 128
# to 255 because the underlying audio data is signed but
# it doesn't make sense to return signed peaks.
self._samples.put(data[i] - 128)
pa_stream_drop(stream)
def main():
monitor = PeakMonitor(SINK_NAME, METER_RATE)
for sample in monitor:
sample = sample >> DISPLAY_SCALE
bar = '>' * sample
spaces = ' ' * (MAX_SPACES - sample)
print (' %3d %s%s\r' % (sample, bar, spaces))
sys.stdout.flush()
if __name__ == '__main__':
main()