Skip to content

Commit

Permalink
Merge pull request #24 from NTIA/analog_inputs
Browse files Browse the repository at this point in the history
Analog inputs and add sensors, digital inputs, and analog inputs to configuration
  • Loading branch information
dboulware authored Nov 30, 2023
2 parents e530ed3 + 46ad1a3 commit 977c4e4
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ repos:
- id: pyupgrade
args: ["--py3-plus"]
- repo: https://github.com/pycqa/isort
rev: 5.11.4
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
Expand Down
47 changes: 37 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ python3 –m pip install .
### `WebRelayPreselector` Configuration

The `WebRelayPreselector` requires a [SigMF metadata file](https://Github.com/NTIA/sigmf-ns-ntia)
that describes the sensor preselector and a config file to describe the x310 settings for
the RF paths specified in the metadata and for any other desired sources. Below is an
example config file for the `WebRelayPreselector` to describe how it works:
that describes the sensor preselector and a config file to describe the WebRelay
settings for the RF paths specified in the metadata and for any other desired sources.
Below is an example config file for the `WebRelayPreselector` to describe how it works:

```json
{
Expand All @@ -71,23 +71,50 @@ example config file for the `WebRelayPreselector` to describe how it works:
"noise diode powered" : "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1"
},
"sensors": {
"internal_temp": 1,
"internal_humidity": 2,
"tec_intake_temp": 3,
"tec_exhaust_temp": 4
},
"digital_inputs": {
"ups_power": 1,
"ups_battery_level": 2,
"ups_trouble": 3,
"ups_battery_replace": 4
},
"analog_inputs": {
"door_sensor": 1,
"5vdc_monitor": 2,
"28vdc_monitor": 3,
"15vdc_monitor": 4,
"24vdc_monitor": 5
}
}
```

Note, the config above is specifically for a prelector with a `ControlByWebWebRelay.`
Other Preselectors and WebRelays may require a different configuration.
The `base_url` and `name` keys are the only required keys for the `WebRelayPreselector`.
The `base_url` should map to the base URL to interact with the WebRelay
(see [https://www.controlbyweb.com/x310](https://www.controlbyweb.com/x310)
for more info). The keys within the control_states key should correspond to RF paths
documented in the SigMF metadata. The keys within the status_states should map to the
for more info). The keys within the `control_states` key should correspond to RF paths
documented in the SigMF metadata. The keys within the `status_states` should map to the
RF paths documented in the SigMF metadata, or to understandable states of the
preselector for which it is desired to determine whether they are enabled or disabled.
The status method of the preselector will provide each of the keys specified in the
status_states entry mapped to a boolean indicating whether the preselector states match
The `get_status` method of the preselector will provide each of the keys specified in the
`status_states` entry mapped to a boolean indicating whether the preselector states match
those specified in the mapping. Each of the entries in the config provide mappings to the
associated web relay input states and every RFPath defined in the sensor definition json
file should have an entry in the preselector config. The keys in the dictionary may use the
name of the RFPath or the index of the RFPath in the RFPaths array.
associated web relay input states and every `RFPath` defined in the sensor definition json
file should have an entry in the preselector config.
The `sensors`, `digital_inputs`, and `analog_inputs` keys define the sensors,
digital inputs and analog inputs configured on the device. Within each of the sections,
each key provides the name of the sensor or input and the value specifies the assigned
sensor or input number. The `get_satus` method will provide each sensor/input value with
the specified label. Every status_state, sensor, and input must have a unique name.
Attempting to create a`ControlByWebWebRelay` with duplicate status_states,
sensors, or inputs will cause a `ConfigurationException.`

In this example, there are `noise_diode_on` and `noise_diode_off` keys to correspond to the
preselector paths to turn the noise diode on and off, and an antenna key to indicate the
Expand Down
19 changes: 19 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@
"noise diode powered" : "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1"
},
"sensors": {
"internal_temp": 1,
"internal_humidity": 2,
"tec_intake_temp": 3,
"tec_exhaust_temp": 4
},
"digital_inputs": {
"ups_power": 1,
"ups_battery_level": 2,
"ups_trouble": 3,
"ups_battery_replace": 4
},
"analog_inputs": {
"door_sensor": 1,
"5vdc_monitor": 2,
"28vdc_monitor": 3,
"15vdc_monitor": 4,
"24vdc_monitor": 5
}

}
2 changes: 1 addition & 1 deletion src/its_preselector/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.0.2"
__version__ = "3.1.0"
59 changes: 57 additions & 2 deletions src/its_preselector/controlbyweb_web_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ def __init__(self, config: dict, timeout: int = 1, retries=3):
elif config["name"] == "":
raise ConfigurationException("name cannot be blank.")
self.retries = retries
self.check_for_unique_names(config)

def check_for_unique_names(self, config: dict):
names = {}
self.check_and_add_keys("status_states", config, names)
self.check_and_add_keys("sensors", config, names)
self.check_and_add_keys("analog_inputs", config, names)
self.check_and_add_keys("digital_inputs", config, names)

def check_and_add_keys(self, key_type: str, config: dict, unique_names: dict):
if key_type in config:
for key, value in config[key_type].items():
if key in unique_names:
raise ConfigurationException(
"All sensors and inputs must have unique names"
)
else:
unique_names[key] = key

def get_sensor_value(self, sensor_num: int) -> float:
"""
Expand All @@ -45,7 +63,7 @@ def get_sensor_value(self, sensor_num: int) -> float:
:return: The desired sensor value.
"""
sensor_num_string = str(sensor_num)
response = self.request_with_retry(self.base_url)
response = self.get_state_xml()
# Check for X310 xml format first.
sensor_tag = "sensor" + sensor_num_string
root = ET.fromstring(response.text)
Expand All @@ -70,7 +88,7 @@ def get_digital_input_value(self, input_num: int) -> bool:
:return: The boolean value of the desired digital input.
"""
input_num = int(input_num)
response = self.request_with_retry(self.base_url)
response = self.get_state_xml()
# Check for X310 format first
input_tag = f"input{input_num}state"
root = ET.fromstring(response.text)
Expand All @@ -83,6 +101,22 @@ def get_digital_input_value(self, input_num: int) -> bool:
raise ConfigurationException(f"Digital Input {input_num} does not exist.")
return bool(int(digital_input.text))

def get_analog_input_value(self, input_num: int) -> float:
"""
Read float value from an analog input of the WebRelay.
:param input_num: Configured index of the desired analog input.
:raises ConfigurationException: If the requested analog input cannot be read.
:return: The desired analog input value.
"""
response = self.get_state_xml()
input_tag = f"analogInput{input_num}"
root = ET.fromstring(response.text)
sensor = root.find(input_tag)
if sensor is None:
raise ConfigurationException(f"Analog input {input_tag} does not exist.")
return float(sensor.text)

def set_state(self, key):
"""
Set the state of the relay.
Expand Down Expand Up @@ -143,6 +177,27 @@ def get_status(self):
for relay_state in relay_states:
matches = matches and self.state_matches(relay_state, xml_root)
state[key] = matches

if "sensors" in self.config:
for key, value in self.config["sensors"].items():
try:
state[key] = self.get_sensor_value(value)
except:
logger.error(
f"Unable to get sensor value for sensor:{value}"
)
if "digital_inputs" in self.config:
for key, value in self.config["digital_inputs"].items():
try:
state[key] = self.get_digital_input_value(value)
except:
logger.error(f"Unable to read digital input:{value}")
if "analog_inputs" in self.config:
for key, value in self.config["analog_inputs"].items():
try:
state[key] = self.get_analog_input_value(value)
except:
logger.error(f"Unable to read analog input:{value}")
except:
logger.error("Unable to get status")
state["healthy"] = healthy
Expand Down
17 changes: 13 additions & 4 deletions src/its_preselector/web_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,33 @@ def __init__(self, config: dict, timeout: int = 1):
self.timeout = timeout

@abstractmethod
def get_sensor_value(sensor) -> float:
def get_sensor_value(self, sensor) -> float:
"""
Read the value from a 1-Wire sensor on the web relay.
:param sensor: The name or ID of the sensor.
:param sensor: The ID of the sensor.
:return: The float value read from the sensor, e.g. the temperature.
"""
pass

@abstractmethod
def get_digital_input_value(input) -> bool:
def get_digital_input_value(self, input_id) -> bool:
"""
Read the value from a digital input on the web relay.
:param input: The name or ID of the digital input
:param input_id: The ID of the digital input
:return: The boolean value read from the digital input.
"""

@abstractmethod
def get_analog_input_value(self, input_id) -> float:
"""
Read the value from an analog input on the web relay.
:param input_id: The ID of the analog input
:return: The float value read from the analog input.
"""

@abstractmethod
def set_state(self, state_key: str) -> None:
"""
Expand Down
100 changes: 100 additions & 0 deletions tests/test_controlbyweb_web_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import defusedxml.ElementTree as ET
from requests import Response, codes

from its_preselector.configuration_exception import ConfigurationException
from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay


Expand All @@ -22,6 +23,7 @@ def setUpClass(cls):
"<relay4>0</relay4>"
"<vin>27.6</vin>"
"<register1>0</register1>"
"<analogInput1>1.4</analogInput1>"
"<oneWireSensor1>102.3</oneWireSensor1>"
"<utcTime>9160590</utcTime>"
"<timezoneOffset>-25200</timezoneOffset>"
Expand All @@ -30,6 +32,27 @@ def setUpClass(cls):
""
)

def test_requires_unique_sensors_and_inputs(self):
with self.assertRaises(ConfigurationException):
relay = ControlByWebWebRelay(
{
"base_url": "127.0.0.1",
"name": "test_preselector",
"control_states": {
"noise_diode_off": "1State=1,2State=0,3State=0,4State=0"
},
"status_states": {
"noise diode powered": "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1",
"noise on": "relay2=1,relay1=1",
"measurements": "relay1=0,relay2=0,relay3=0,relay4=0",
},
"sensors": {"duplicate": 1},
"analog_inputs": {"duplicate": 1},
}
)

def test_is_enabled(self):
web_relay = ControlByWebWebRelay(
{"base_url": "127.0.0.1", "name": "test_switch"}
Expand Down Expand Up @@ -87,6 +110,83 @@ def test_get_state_from_config(self):
self.assertTrue(states["noise diode path enabled"])
self.assertTrue(states["noise on"])

def test_get_sensor_value(self):
root = ET.fromstring(self.state)
web_relay = ControlByWebWebRelay(
{
"base_url": "http://127.0.0.1",
"name": "test_preselector",
"control_states": {
"noise_diode_off": "1State=1,2State=0,3State=0,4State=0"
},
"status_states": {
"noise diode powered": "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1",
"noise on": "relay2=1,relay1=1",
"measurements": "relay1=0,relay2=0,relay3=0,relay4=0",
},
}
)
response = Response()
response.status_code = codes.ok
type(response).text = PropertyMock(return_value=self.state)
web_relay.get_state_xml = MagicMock(return_value=response)
sensor_value = web_relay.get_sensor_value(1)
self.assertEqual(102.3, sensor_value)

def test_get_digital_input(self):
root = ET.fromstring(self.state)
web_relay = ControlByWebWebRelay(
{
"base_url": "http://127.0.0.1",
"name": "test_preselector",
"control_states": {
"noise_diode_off": "1State=1,2State=0,3State=0,4State=0"
},
"status_states": {
"noise diode powered": "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1",
"noise on": "relay2=1,relay1=1",
"measurements": "relay1=0,relay2=0,relay3=0,relay4=0",
},
"analog_inputs": {"analogInputTest": 1},
}
)
response = Response()
response.status_code = codes.ok
type(response).text = PropertyMock(return_value=self.state)
web_relay.get_state_xml = MagicMock(return_value=response)
input_val = web_relay.get_digital_input_value(1)
self.assertEqual(False, input_val)

def test_get_analog_input(self):
root = ET.fromstring(self.state)
web_relay = ControlByWebWebRelay(
{
"base_url": "http://127.0.0.1",
"name": "test_preselector",
"control_states": {
"noise_diode_off": "1State=1,2State=0,3State=0,4State=0"
},
"status_states": {
"noise diode powered": "relay2=1",
"antenna path enabled": "relay1=0",
"noise diode path enabled": "relay1=1",
"noise on": "relay2=1,relay1=1",
"measurements": "relay1=0,relay2=0,relay3=0,relay4=0",
},
"analog_inputs": {"analogInputTest": 1},
}
)
response = Response()
response.status_code = codes.ok
type(response).text = PropertyMock(return_value=self.state)
web_relay.get_state_xml = MagicMock(return_value=response)
analogInputVal = web_relay.get_analog_input_value(1)
self.assertEqual(1.4, analogInputVal)

def test_get_status(self):
web_relay = ControlByWebWebRelay(
{
Expand Down

0 comments on commit 977c4e4

Please sign in to comment.