This repository has been archived by the owner on Jul 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
musicsync.py
160 lines (134 loc) · 5.62 KB
/
musicsync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import time
from Queue import Queue
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast
from dotstar import Adafruit_DotStar
numPixels = 300 # Number of LEDs in strip
strip = Adafruit_DotStar(numPixels)
strip.begin() # Initialize pins for output
strip.setBrightness(240) # Limit brightness to ~1/4 duty cycle
# From https://github.com/Valodim/python-pulseaudio
from pulseaudio.lib_pulseaudio import *
# edit to match your sink
SINK_NAME = 'alsa_output.0.analog-stereo'
METER_RATE = 344
MAX_SAMPLE_VALUE = 127
DISPLAY_SCALE = 2
MAX_SPACES = MAX_SAMPLE_VALUE >> DISPLAY_SCALE
def rainbowCycle(wait):
for j in range(0,256*5):
for i in range(0,numPixels):
strip.setPixelColor(i, Wheel(((i * 256 / numPixels) + j) & 255))
strip.show()
time.sleep(wait)
def Wheel(WheelPos):
WheelPos = 255 - WheelPos
if WheelPos < 85:
return strip.Color(255 - WheelPos * 3, 0, WheelPos * 3)
elif WheelPos < 170:
WheelPos -= 85;
return strip.Color(0, WheelPos * 3, 255 - WheelPos * 3);
WheelPos -= 170;
return strip.Color(WheelPos * 3, 255 - WheelPos * 3, 0);
class PeakMonitor(object):
def __init__(self, sink_name, rate):
self.sink_name = sink_name
self.rate = rate
# Wrap callback methods in appropriate ctypefunc instances so
# that the Pulseaudio C API can call them
self._context_notify_cb = pa_context_notify_cb_t(self.context_notify_cb)
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb)
self._stream_read_cb = pa_stream_request_cb_t(self.stream_read_cb)
# stream_read_cb() puts peak samples into this Queue instance
self._samples = Queue()
# Create the mainloop thread and set our context_notify_cb
# method to be called when there's updates relating to the
# connection to Pulseaudio
_mainloop = pa_threaded_mainloop_new()
_mainloop_api = pa_threaded_mainloop_get_api(_mainloop)
context = pa_context_new(_mainloop_api, 'peak_demo')
pa_context_set_state_callback(context, self._context_notify_cb, None)
pa_context_connect(context, None, 0, None)
pa_threaded_mainloop_start(_mainloop)
def __iter__(self):
while True:
yield self._samples.get()
def context_notify_cb(self, context, _):
state = pa_context_get_state(context)
if state == PA_CONTEXT_READY:
print "Pulseaudio connection ready..."
# Connected to Pulseaudio. Now request that sink_info_cb
# be called with information about the available sinks.
o = pa_context_get_sink_info_list(context, self._sink_info_cb, None)
pa_operation_unref(o)
elif state == PA_CONTEXT_FAILED :
print "Connection failed"
elif state == PA_CONTEXT_TERMINATED:
print "Connection terminated"
def sink_info_cb(self, context, sink_info_p, _, __):
if not sink_info_p:
return
sink_info = sink_info_p.contents
print '-'* 60
print 'index:', sink_info.index
print 'name:', sink_info.name
print 'description:', sink_info.description
if sink_info.name == self.sink_name:
# Found the sink we want to monitor for peak levels.
# Tell PA to call stream_read_cb with peak samples.
print
print 'setting up peak recording using', sink_info.monitor_source_name
print
samplespec = pa_sample_spec()
samplespec.channels = 1
samplespec.format = PA_SAMPLE_U8
samplespec.rate = self.rate
pa_stream = pa_stream_new(context, "peak detect demo", samplespec, None)
pa_stream_set_read_callback(pa_stream,
self._stream_read_cb,
sink_info.index)
pa_stream_connect_record(pa_stream,
sink_info.monitor_source_name,
None,
PA_STREAM_PEAK_DETECT)
def stream_read_cb(self, stream, length, index_incr):
data = c_void_p()
pa_stream_peek(stream, data, c_ulong(length))
data = cast(data, POINTER(c_ubyte))
for i in xrange(length):
# When PA_SAMPLE_U8 is used, samples values range from 128
# to 255 because the underlying audio data is signed but
# it doesn't make sense to return signed peaks.
self._samples.put(data[i] - 128)
pa_stream_drop(stream)
def map(value, from_min, from_max, to_min, to_max):
return ((to_max-to_min) * ((value-from_min)/(from_max-from_min)))+to_min
def main():
j = 0
monitor = PeakMonitor(SINK_NAME, METER_RATE)
for sample in monitor:
sample = sample >> DISPLAY_SCALE
'''
if sample == 0:
rainbowCycle(j, 0.01)
j += 1
if j >= 256*5:
j = 0
else:
'''
sample = ((float(sample))/120)*280
for i in range(0,numPixels/2+1):
if i < sample*1.5:
strip.setPixelColor((i+(numPixels/2)), strip.Color(0,255,0));
strip.setPixelColor(((numPixels/2)-i), strip.Color(0,255,0));
elif i < (sample+66):
strip.setPixelColor((i+(numPixels/2)), strip.Color(255,0,0));
strip.setPixelColor(((numPixels/2)-i), strip.Color(255,0,0));
else:
strip.setPixelColor((i+(numPixels/2)), strip.Color(0,0,255));
strip.setPixelColor(((numPixels/2)-i), strip.Color(0,0,255));
strip.show()
print '%3d\r' % (sample),
sys.stdout.flush()
if __name__ == '__main__':
main()