forked from klauer/caproto_ios
-
Notifications
You must be signed in to change notification settings - Fork 0
/
location_ioc.py
executable file
·48 lines (43 loc) · 1.97 KB
/
location_ioc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python3
import caproto
from caproto.server import pvproperty, PVGroup, ioc_arg_parser, run
import time
import location
class LocationIOC(PVGroup):
authorized = pvproperty(value=[0], # enum TODO
doc='Authorized access to location data')
coordinates = pvproperty(value=[0., 0.], # enum TODO
doc='Latitude, longitude')
location = pvproperty(value=['*' * 1000],
doc='Street address information',
max_length=1000,
dtype=caproto.ChannelType.CHAR)
@authorized.startup
async def authorized(self, instance, async_lib):
location.start_updates()
try:
while True:
timestamp = time.time()
if not location.is_authorized():
await self.authorized.write(value=[0], timestamp=timestamp)
else:
info = location.get_location()
await self.authorized.write(value=[1], timestamp=timestamp)
latitude = info.get('latitude', 0.)
longitude = info.get('longitude', 0.)
timestamp = info.get('timestamp', timestamp)
await self.coordinates.write(value=[latitude, longitude],
timestamp=timestamp)
loc = location.reverse_geocode(dict(latitude=latitude,
longitude=longitude))
await self.location.write(value=[str(loc)],
timestamp=timestamp)
await async_lib.library.sleep(5.0)
finally:
location.stop_updates()
if __name__ == '__main__':
ioc_options, run_options = ioc_arg_parser(
default_prefix='location:',
desc="iOS location data IOC")
ioc = LocationIOC(**ioc_options)
run(ioc.pvdb, **run_options)