Skip to content

Commit

Permalink
Pump controller now working with configurable telnet port
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeHudsonDLS committed Oct 17, 2024
1 parent e5ab5d0 commit 538d105
Show file tree
Hide file tree
Showing 7 changed files with 522 additions and 165 deletions.
24 changes: 24 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,30 @@
"examples/configs/eiger/eiger.yaml"
]
},
{
"name": "digitelMPC",
"type": "debugpy",
"request": "launch",
"module": "tickit",
"justMyCode": false,
"console": "integratedTerminal",
"args": [
"all",
"examples/configs/digitelmpc/digitelmpc.yaml"
]
},
{
"name": "femto",
"type": "debugpy",
"request": "launch",
"module": "tickit",
"justMyCode": false,
"console": "integratedTerminal",
"args": [
"all",
"examples/configs/femto/current-monitor.yaml"
]
},
{
"name": "Debug Unit Test",
"type": "debugpy",
Expand Down
1 change: 1 addition & 0 deletions examples/configs/digitelmpc/digitelmpc.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- type: tickit_devices.digitelmpc.DigitelMpc
name: digitelMpc
inputs: {}
port: 25566
10 changes: 4 additions & 6 deletions src/tickit_devices/digitelmpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@ class DigitelMpc(ComponentConfig):

host: str = "localhost"
port: int = 25565
separator: str = "\r"

def __call__(self) -> Component: # noqa: D102
device = DigitelMpcDevice()
device = DigitelMpcDevice(port=self.port)
adapters = [
AdapterContainer(
DigitelMpcAdapter(device),
TcpIo(
self.host,
self.port,
),
TcpIo(self.host, self.port, separator=self.separator),
)
]
return DeviceComponent(
name=self.name,
device=device,
adapters=adapters,
)
)
151 changes: 66 additions & 85 deletions src/tickit_devices/digitelmpc/base.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,89 @@
import logging
import asyncio
from ctypes import c_short, c_ubyte, c_ushort
from typing import Union

from tickit_devices.digitelmpc.states import Status, Error, SpRelay
from tickit_devices.digitelmpc.ionPump import IonPump
from tickit_devices.digitelmpc.states import IonPumpState

LOGGER = logging.getLogger(__name__)


class DigitelMpcBase:
"""A base class for digitelMpc logic."""

low_pressure: float = 8.2e-11
high_pressure: float = 800
set_point_on: float = 1.0e-08
set_point_off: float = 2.0e-08
max_pump_size: int = 1000
min_pump_size: int = 100
min_voltage: int = 0
max_voltage: int = 5000

def __init__(self) -> None:
"""A digitelMpcBase contructor which assigns initial values"""
self.current: int = 97e-9
self.pressure: int = 0
self.voltage: int = 80
self.status: int = 0
self.error_code: int = 0
self.sp_relay: int = SpRelay.ON.value
self.pump_size: int = 0
self.calibration: int = 0

async def start(self) -> None:
"""Starts the digitelMpc controller"""
self.status = Status.STARTING.value
self.error_code = Error.OK.value
self.pressure = 8.0e-11
self.status = Status.RUNNING.value

async def reset(self) -> None:
"""Resets the digitelMpc controller"""
self.__init__()
self.status = Status.STARTING.value
self.status = Status.RUNNING.value

async def stop(self) -> None:
"""Stops the digitelMpc Controller """
self.status = Status.STANDBY.value

async def get_pressure(self) -> None:
self.model = "DIGITEL MPCe"
self.units = "TORR"
self.firmware_version = "05.07.02"
self.pumps = [None, IonPump(1), IonPump(2)]
self.sps = {
"spon": [
None,
float("1.0E-07"),
float("2.0E-07"),
float("2.0E-07"),
float("2.0E-07"),
],
"spoff": [
None,
float("1.2E-07"),
float("2.2E-07"),
float("2.2E-07"),
float("2.2E-07"),
],
"spstate": [None, 1, 1, 1, 1],
}

async def start(self, pump: int) -> None:
"""Starts ion pump"""
await self.pumps[pump].start()

async def stop(self, pump: int) -> None:
"""Starts ion pump"""
await self.pumps[pump].stop()

async def get_pressure(self, pump: int) -> None:
"""returns the current pressure of the system"""
return self.pressure
async def get_current(self) -> None:
return self.pumps[pump].pressure

async def get_current(self, pump: int) -> None:
"""returns the current of the system"""
return self.current
async def get_voltage(self) -> None:
return self.pumps[pump].current

async def get_voltage(self, pump: int) -> None:
"""returns the current voltage of the system"""
return self.voltage

async def set_pump_size(self, pump_size: int ) -> None:
"""Sets the pump size"""
if pump_size < self.min_pump_size or pump_size > self.max_pump_size:
self.status = Status.ERROR.value
else:
self.pump_size = pump_size

async def get_pump_size(self) -> None:
"""Returns the pump size"""
return self.pump_size

async def set_voltage(self, voltage_value: int) -> None:
"""Sets the voltage"""
if voltage_value < self.min_voltage and voltage_value > self.max_voltage:
self.status = Status.ERROR.value
else:
self.voltage = voltage_value
return self.pumps[pump].voltage

if voltage_value < self.min_voltage:
self.error_code = Error.LOW_VOLTAGE.value
async def set_size(self, pump: int, size: int) -> None:
"""Sets the pump size"""
await self.pumps[pump].setSize(size)

self.pressure = ((voltage_value)*self.high_pressure)/self.max_voltage
async def get_size(self, pump: int) -> None:
"""Returns the pump size"""
return self.pumps[pump].size

async def set_text(self, pump: int, text: str) -> None:
"""Returns the pump size"""
await self.pumps[pump].set_text(text)

async def get_pump_status(self) -> None:
async def get_status(self, pump: int) -> None:
"""Gets the current status of the pump controller"""
return self.status
return IonPumpState(self.pumps[pump].status).name

async def get_sp_relay(self) -> None:
async def set_sp_relay(self, relay: int, spon: float, spoff: float) -> None:
"""Gets the sp relay status"""
return self.sp_relay


async def set_calibration(self, calibration_value: int) -> int:

self.calibration = calibration_value

async def auto_restart(self, autorestart: int) -> int:

pass




self.sps["spon"][relay] = spon
self.sps["spoff"][relay] = spoff

async def get_sp_relay(self, relay: int) -> list:
"""Gets the sp relay status"""
return [
self.sps["spon"][relay],
self.sps["spoff"][relay],
self.sps["spstate"][relay],
]

async def get_cal(self, pump: int) -> float:
return self.pumps[pump].cal

async def get_strapV(self, pump: int) -> int:
return self.pumps[pump].strapV
Loading

0 comments on commit 538d105

Please sign in to comment.