Skip to content

Commit

Permalink
fix(plc4py/umas): PLC Status Message and Project CRC can now be received
Browse files Browse the repository at this point in the history
  • Loading branch information
hutcheb committed Jan 21, 2024
1 parent 02e2d43 commit a0045f0
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 5 deletions.
11 changes: 11 additions & 0 deletions protocols/umas/src/main/resources/protocols/umas/umas.mspec
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
]
['0x02' UmasPDUPlcIdentRequest
]
['0x03' UmasPDUProjectInfoRequest
[simple uint 8 subcode]
]
['0x04' UmasPDUPlcStatusRequest
]

['0xFE', '0x01' UmasInitCommsResponse
[simple uint 16 maxFrameSize]
Expand All @@ -93,6 +98,12 @@
[simple uint 8 numberOfMemoryBanks
[array PlcMemoryBlockIdent memoryIdents count 'numberOfMemoryBanks']
]
['0xFE', '0x04' UmasPDUPlcStatusResponse
[simple uint 24 notUsed]
[simple uint 8 numberOfBlocks]
[array uint 32 blocks count 'numberOfBlocks']

]
]
]

Expand Down
44 changes: 42 additions & 2 deletions sandbox/plc4py/plc4py/drivers/umas/UmasDevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,24 @@
# under the License.
#
import asyncio
from asyncio import Transport
import logging
from asyncio import Transport, AbstractEventLoop
from dataclasses import dataclass, field
from typing import Dict

from plc4py.protocols.umas.readwrite.UmasPDUPlcStatusResponse import (
UmasPDUPlcStatusResponse,
)

from plc4py.protocols.umas.readwrite.UmasPDUPlcStatusRequest import (
UmasPDUPlcStatusRequestBuilder,
)

from plc4py.protocols.umas.readwrite.UmasPDUProjectInfoRequest import (
UmasPDUProjectInfoRequest,
UmasPDUProjectInfoRequestBuilder,
)

from plc4py.protocols.umas.readwrite.UmasInitCommsRequest import (
UmasInitCommsRequestBuilder,
)
Expand All @@ -40,10 +54,17 @@
class UmasDevice:
_configuration: UmasConfiguration
tags: Dict[str, PlcValue] = field(default_factory=lambda: {})
project_crc: int = -1

async def connect(self, transport: Transport):
# Create future to be returned when a value is returned
loop = asyncio.get_running_loop()
await self._send_plc_ident(transport, loop)
await self._send_init_comms(transport, loop)
await self._send_project_info(transport, loop)
pass

async def _send_plc_ident(self, transport: Transport, loop: AbstractEventLoop):
message_future = loop.create_future()

request_pdu = UmasPDUPlcIdentRequestBuilder().build(0)
Expand All @@ -58,6 +79,7 @@ async def connect(self, transport: Transport):
await message_future
ident_result = message_future.result()

async def _send_init_comms(self, transport: Transport, loop: AbstractEventLoop):
message_future = loop.create_future()

request_pdu = UmasInitCommsRequestBuilder(0).build(0)
Expand All @@ -71,7 +93,25 @@ async def connect(self, transport: Transport):

await message_future
init_result = message_future.result()
pass

async def _send_project_info(self, transport: Transport, loop: AbstractEventLoop):
message_future = loop.create_future()

request_pdu = UmasPDUPlcStatusRequestBuilder().build(0)

protocol = transport.protocol
protocol.write_wait_for_response(
request_pdu,
transport,
message_future,
)

await message_future
project_info_result: UmasPDUPlcStatusResponse = message_future.result()
if project_info_result.number_of_blocks > 3:
if project_info_result.blocks[3] == project_info_result.blocks[4]:
logging.debug("Received Valid Project CRC Response")
self.project_crc = project_info_result.blocks[3]

async def read(
self, request: PlcReadRequest, transport: Transport
Expand Down
24 changes: 24 additions & 0 deletions sandbox/plc4py/plc4py/protocols/umas/readwrite/UmasPDUItem.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ def static_parse_context(read_buffer: ReadBuffer, umas_request_function_key: int
builder = UmasPDUPlcIdentRequest.static_parse_builder(
read_buffer, umas_request_function_key
)
from plc4py.protocols.umas.readwrite.UmasPDUProjectInfoRequest import (
UmasPDUProjectInfoRequest,
)

if umas_function_key == int(0x03):
builder = UmasPDUProjectInfoRequest.static_parse_builder(
read_buffer, umas_request_function_key
)
from plc4py.protocols.umas.readwrite.UmasPDUPlcStatusRequest import (
UmasPDUPlcStatusRequest,
)

if umas_function_key == int(0x04):
builder = UmasPDUPlcStatusRequest.static_parse_builder(
read_buffer, umas_request_function_key
)
from plc4py.protocols.umas.readwrite.UmasInitCommsResponse import (
UmasInitCommsResponse,
)
Expand All @@ -154,6 +170,14 @@ def static_parse_context(read_buffer: ReadBuffer, umas_request_function_key: int
builder = UmasPDUPlcIdentResponse.static_parse_builder(
read_buffer, umas_request_function_key
)
from plc4py.protocols.umas.readwrite.UmasPDUPlcStatusResponse import (
UmasPDUPlcStatusResponse,
)

if umas_function_key == int(0xFE) and umas_request_function_key == int(0x04):
builder = UmasPDUPlcStatusResponse.static_parse_builder(
read_buffer, umas_request_function_key
)
if builder is None:
raise ParseException(
"Unsupported case for discriminated type"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from dataclasses import dataclass

from plc4py.api.exceptions.exceptions import PlcRuntimeException
from plc4py.api.exceptions.exceptions import SerializationException
from plc4py.api.messages.PlcMessage import PlcMessage
from plc4py.protocols.umas.readwrite.UmasPDUItem import UmasPDUItem
from plc4py.spi.generation.ReadBuffer import ReadBuffer
from plc4py.spi.generation.WriteBuffer import WriteBuffer
from typing import ClassVar
import math


@dataclass
class UmasPDUPlcStatusRequest(UmasPDUItem):
# Accessors for discriminator values.
umas_function_key: ClassVar[int] = 0x04
umas_request_function_key: ClassVar[int] = 0

def serialize_umas_pdu_item_child(self, write_buffer: WriteBuffer):
write_buffer.push_context("UmasPDUPlcStatusRequest")

write_buffer.pop_context("UmasPDUPlcStatusRequest")

def length_in_bytes(self) -> int:
return int(math.ceil(float(self.length_in_bits() / 8.0)))

def length_in_bits(self) -> int:
length_in_bits: int = super().length_in_bits()
_value: UmasPDUPlcStatusRequest = self

return length_in_bits

@staticmethod
def static_parse_builder(read_buffer: ReadBuffer, umas_request_function_key: int):
read_buffer.push_context("UmasPDUPlcStatusRequest")

read_buffer.pop_context("UmasPDUPlcStatusRequest")
# Create the instance
return UmasPDUPlcStatusRequestBuilder()

def equals(self, o: object) -> bool:
if self == o:
return True

if not isinstance(o, UmasPDUPlcStatusRequest):
return False

that: UmasPDUPlcStatusRequest = UmasPDUPlcStatusRequest(o)
return super().equals(that) and True

def hash_code(self) -> int:
return hash(self)

def __str__(self) -> str:
pass
# write_buffer_box_based: WriteBufferBoxBased = WriteBufferBoxBased(True, True)
# try:
# write_buffer_box_based.writeSerializable(self)
# except SerializationException as e:
# raise PlcRuntimeException(e)

# return "\n" + str(write_buffer_box_based.get_box()) + "\n"


@dataclass
class UmasPDUPlcStatusRequestBuilder:
def build(self, pairing_key) -> UmasPDUPlcStatusRequest:
umas_pdu_plc_status_request: UmasPDUPlcStatusRequest = UmasPDUPlcStatusRequest(
pairing_key,
)
return umas_pdu_plc_status_request
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from dataclasses import dataclass

from plc4py.api.exceptions.exceptions import PlcRuntimeException
from plc4py.api.exceptions.exceptions import SerializationException
from plc4py.api.messages.PlcMessage import PlcMessage
from plc4py.protocols.umas.readwrite.UmasPDUItem import UmasPDUItem
from plc4py.spi.generation.ReadBuffer import ReadBuffer
from plc4py.spi.generation.WriteBuffer import WriteBuffer
from typing import Any
from typing import ClassVar
from typing import List
import math


@dataclass
class UmasPDUPlcStatusResponse(UmasPDUItem):
not_used: int
number_of_blocks: int
blocks: List[int]
# Accessors for discriminator values.
umas_function_key: ClassVar[int] = 0xFE
umas_request_function_key: ClassVar[int] = 0x04

def serialize_umas_pdu_item_child(self, write_buffer: WriteBuffer):
write_buffer.push_context("UmasPDUPlcStatusResponse")

# Simple Field (notUsed)
write_buffer.write_unsigned_int(self.not_used, logical_name="notUsed")

# Simple Field (numberOfBlocks)
write_buffer.write_unsigned_byte(
self.number_of_blocks, logical_name="numberOfBlocks"
)

# Array Field (blocks)
write_buffer.write_simple_array(
self.blocks, write_unsigned_int, logical_name="blocks"
)

write_buffer.pop_context("UmasPDUPlcStatusResponse")

def length_in_bytes(self) -> int:
return int(math.ceil(float(self.length_in_bits() / 8.0)))

def length_in_bits(self) -> int:
length_in_bits: int = super().length_in_bits()
_value: UmasPDUPlcStatusResponse = self

# Simple field (notUsed)
length_in_bits += 24

# Simple field (numberOfBlocks)
length_in_bits += 8

# Array field
if self.blocks is not None:
length_in_bits += 32 * len(self.blocks)

return length_in_bits

@staticmethod
def static_parse_builder(read_buffer: ReadBuffer, umas_request_function_key: int):
read_buffer.push_context("UmasPDUPlcStatusResponse")

not_used: int = read_buffer.read_unsigned_int(
logical_name="notUsed",
bit_length=24,
umas_request_function_key=umas_request_function_key,
)

number_of_blocks: int = read_buffer.read_unsigned_byte(
logical_name="numberOfBlocks",
bit_length=8,
umas_request_function_key=umas_request_function_key,
)

blocks: List[Any] = read_buffer.read_array_field(
logical_name="blocks",
read_function=read_buffer.read_unsigned_int,
count=number_of_blocks,
umas_request_function_key=umas_request_function_key,
)

read_buffer.pop_context("UmasPDUPlcStatusResponse")
# Create the instance
return UmasPDUPlcStatusResponseBuilder(not_used, number_of_blocks, blocks)

def equals(self, o: object) -> bool:
if self == o:
return True

if not isinstance(o, UmasPDUPlcStatusResponse):
return False

that: UmasPDUPlcStatusResponse = UmasPDUPlcStatusResponse(o)
return (
(self.not_used == that.not_used)
and (self.number_of_blocks == that.number_of_blocks)
and (self.blocks == that.blocks)
and super().equals(that)
and True
)

def hash_code(self) -> int:
return hash(self)

def __str__(self) -> str:
pass
# write_buffer_box_based: WriteBufferBoxBased = WriteBufferBoxBased(True, True)
# try:
# write_buffer_box_based.writeSerializable(self)
# except SerializationException as e:
# raise PlcRuntimeException(e)

# return "\n" + str(write_buffer_box_based.get_box()) + "\n"


@dataclass
class UmasPDUPlcStatusResponseBuilder:
not_used: int
number_of_blocks: int
blocks: List[int]

def build(self, pairing_key) -> UmasPDUPlcStatusResponse:
umas_pdu_plc_status_response: UmasPDUPlcStatusResponse = (
UmasPDUPlcStatusResponse(
pairing_key, self.not_used, self.number_of_blocks, self.blocks
)
)
return umas_pdu_plc_status_response
Loading

0 comments on commit a0045f0

Please sign in to comment.