forked from oldbridge/bluetooth-phone
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathringer.py
93 lines (83 loc) · 3.4 KB
/
ringer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import dbus
import config
from threading import Thread
import RPi.GPIO as GPIO
import dbus.mainloop.glib
from gi.repository import GLib
import time
class Ringer(Thread):
"""
Thread to run the hardware ringer.
Used settings from config for pins and ringer frequency.
"""
def __init__(self, ringer_pin, sequence):
Thread.__init__(self)
GPIO.setmode(GPIO.BCM)
self.pin = ringer_pin
GPIO.setup(self.pin, GPIO.OUT)
# set the PWM GPIO to control the ringer.
self.ringer = GPIO.PWM(self.pin, config.RINGER_FREQUENCY)
self.ringer.start(0)
# Configure the enable pin
GPIO.setup(config.RINGER_ENABLE_PIN, GPIO.OUT)
GPIO.output(config.RINGER_ENABLE_PIN,0)
self.is_enabled = False
self.seq = sequence
self.is_ringing = False # Gettable/Settable flag to start/stop ringing
self.finished = False
def run(self):
"""
Perpetual loop. Because the self.is_ringing can be changed asynchronously its important
to check that the status hasn't changed before ring continuing with the sequence of rings.
"""
ringing = False
print("Ringer Thread Started")
while not self.finished:
if self.is_ringing:
for x in range(self.seq.size):
if ringing:
print("pulse off")
self.ringer.ChangeDutyCycle(0)
ringing = False
else:
ringing = True
print("pulse on")
if self.is_ringing:
"""
Note there is a bug in the RPi.GPIO module that means you can not repeatedly turn the
ringer on an off.Changing teh duty cycle is the workaround
"""
self.ringer.ChangeDutyCycle(50)
if self.is_ringing:
time.sleep(self.seq[x])
else:
pass
class RingerManager(object):
"""
Management object for controlling the ringer via the DBus.
"""
def __init__(self):
self.bus = dbus.SystemBus()
self._setup_listeners()
self.finished = False
self.is_ringing = False
""" Create the ringer thread so that it is inscope for the _control ringer function"""
self._ringer = Ringer(config.RINGER_PIN, config.RINGER_PATTERN)
self._ringer.start()
def _setup_listeners(self):
print("create ringer listeners")
self.ring_service_interface = dbus.Interface(self.bus.get_object('org.frank', '/'), "phone.status")
self.ring_service_interface.connect_to_signal('ring', self._control_ringer)
def _control_ringer(self, value):
""" Handler set flag (self.is_ringer) that will stop the loop in the Ringer thread."""
print(f"Ringing Controller {value}")
if value == config.RING_START:
print("dbus ring start signal received")
self._ringer.is_ringing = True
print("Apply power to ringer")
GPIO.output(config.RINGER_ENABLE_PIN,1)
else:
print("dbus ringer stop signal received")
self._ringer.is_ringing = False
GPIO.output(config.RINGER_ENABLE_PIN,0)
print("Remove power from ringer")