forked from ibm-watson-iot/gateway-hue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gateway-hue.py
125 lines (93 loc) · 3.95 KB
/
gateway-hue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import sys
import json
import time
import ibmiotf.application
from ibmiotf import APIException
import requests
import signal
import argparse
import logging
from logging.handlers import RotatingFileHandler
class Server():
def __init__(self, args):
# Setup logging - Generate a default rotating file log handler and stream handler
fhFormatter = logging.Formatter('%(asctime)-25s %(levelname)-7s %(message)s')
sh = logging.StreamHandler()
sh.setFormatter(fhFormatter)
self.logger = logging.getLogger("server")
self.logger.addHandler(sh)
self.logger.setLevel(logging.DEBUG)
self.options = ibmiotf.application.ParseConfigFile(args.config)
# Init IOTF client
self.client = ibmiotf.application.Client(self.options, logHandlers=[sh])
# Internal State
self.knownDeviceTypes = {}
self.knownDevices = {}
# Init Hue properties
self.hueAddress = args.ip
self.hueUsername = args.username
def _poll(self):
# See: http://www.developers.meethue.com/documentation/lights-api
r = requests.get("http://%s/api/%s/lights" % (self.hueAddress, self.hueUsername))
lights = r.json();
if isinstance(lights, list) and "error" in lights[0]:
self.logger.critical("Error querying Hue Hub (%s) with username %s: %s" % (self.hueAddress, self.hueUsername, lights[0]['error']['description']))
else:
for id, light in lights.items():
# Properties of the IoTF Device Type
typeId = light['modelid'].replace(" ", "_")
typeDescription = light['type']
typeManufacturer = light['manufacturername']
typeModel = light['modelid']
# Properties of the IoTF Device
deviceId = light['uniqueid'].replace(":", "-")
deviceSwVersion = light['swversion']
deviceManufacturer = light['manufacturername']
deviceModel = light['modelid']
state = light['state']
# Register the device type if we need to
if typeId not in self.knownDeviceTypes:
try:
deviceType = self.client.api.getDeviceType(typeId)
self.knownDeviceTypes[typeId] = deviceType
except APIException as e:
self.logger.debug("ERROR [" + str(e.httpCode) + "] " + e.message)
self.logger.info("Registering new device type: %s" % (typeId))
deviceTypeInfo = { "manufacturer": typeManufacturer, "model": typeModel, "description": typeDescription, }
deviceType = self.client.api.addDeviceType(typeId=typeId, description=typeDescription, deviceInfo=deviceTypeInfo)
self.knownDeviceTypes[typeId] = deviceType
# Register the device if we need to
if deviceId not in self.knownDevices:
try:
device = self.client.api.getDevice(typeId, deviceId)
self.knownDevices[deviceId] = device
except APIException as e:
self.logger.debug("ERROR [" + str(e.httpCode) + "] " + e.message)
self.logger.info("Registering new device: %s:%s" % (typeId, deviceId))
deviceMetadata = {"customField1": "customValue1", "customField2": "customValue2"}
deviceInfo = {"manufacturer": deviceManufacturer, "model": deviceModel, "fwVersion" : deviceSwVersion}
device = self.client.api.registerDevice(typeId, deviceId, authToken=None, deviceInfo=deviceInfo, location=None, metadata=deviceMetadata)
self.knownDevices[deviceId] = device
# Publish the current state of the light
self.client.publishEvent(typeId, deviceId, "state", "json", state)
def start(self):
self.client.connect()
while True:
self._poll()
time.sleep(60)
def stop(self):
self.client.disconnect()
def interruptHandler(signal, frame):
server.stop()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, interruptHandler)
# Initialize the properties we need
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', required=False)
parser.add_argument('-i', '--ip', required=True)
parser.add_argument('-u', '--username', required=True)
args, unknown = parser.parse_known_args()
print("(Press Ctrl+C to disconnect)")
server = Server(args)
server.start()