Skip to content
This repository has been archived by the owner on Feb 19, 2024. It is now read-only.

Commit

Permalink
Adding functionality also for encoding.
Browse files Browse the repository at this point in the history
  • Loading branch information
freol35241 committed Nov 21, 2023
1 parent f1c86ca commit 245d334
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 312 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Binaries for working with payloads:

### 3rd-party tools

This extension is built on top of [`porla-zenoh`](https://github.com/MO-RISE/porla-zenoh) and thus bundles [`zenoh-cli`](https://github.com/MO-RISE/zenoh-cli).
This extension bundles [`zenoh-cli`](https://github.com/MO-RISE/zenoh-cli).

## Usage

Expand All @@ -29,5 +29,6 @@ services:
image: ghcr.io/mo-rise/porla-keelson
network_mode: host
restart: always
command: ["from_bus 3 | base64 --encode | enclose | zenoh put --base64 --key my/key/expression --line '{message}'"]
command: ["from_bus 3 | base64 --encode | brefv encode '{payload_b64}' '{envelope}' | zenoh put --base64 --key my/key/expression --line '{message}'"]

```
101 changes: 81 additions & 20 deletions bin/brefv
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ import argparse
from base64 import b64encode, b64decode

import parse
import brefv
from google.protobuf.json_format import Parse, MessageToDict
import brefv # pylint: disable=import-self
from google.protobuf.json_format import ParseDict, MessageToDict

logger = logging.getLogger("brefv")


def decode(args: argparse.Namespace): # pylint: disable=redefined-outer-name
"""Decode topics/envelopes from stdin to stdout"""

# Compile pattern
input_pattern = parse.compile(args.input_specification)

def decode(args: argparse.Namespace):
for line in sys.stdin:
logger.debug("Handling line with content: %s", line)
res = input_pattern.parse(line.rstrip())
Expand All @@ -31,7 +38,8 @@ def decode(args: argparse.Namespace):
)
continue

output = {}
# Make input available as output (for passthrough operations)
output = res.named

if topic := res.named.get("topic"):
output.update(brefv.parse_pub_sub_topic(topic))
Expand All @@ -52,23 +60,81 @@ def decode(args: argparse.Namespace):
tag = output.get("tag")
payload_raw = output.get("payload_raw")

if (
tag
and payload_raw
and brefv.get_tag_encoding(tag) == "protobuf"
):
if tag and payload_raw and brefv.get_tag_encoding(tag) == "protobuf":
type_name = brefv.get_tag_description(tag)
message = brefv.decode_protobuf_payload_from_type_name(payload_raw, type_name)
output.update({
"payload_pb2js": json.dumps(MessageToDict(message))
})
message = brefv.decode_protobuf_payload_from_type_name(
payload_raw, type_name
)
output.update({"payload_pb2js": json.dumps(MessageToDict(message))})

sys.stdout.write(args.output_specification.format(**output) + "\n")
sys.stdout.flush()


def encode(args: argparse.Namespace):
pass
def encode(args: argparse.Namespace): # pylint: disable=redefined-outer-name
"""Encode payloads from stdin to stdout"""

# Compile pattern
input_pattern = parse.compile(args.input_specification)

for line in sys.stdin:
logger.debug("Handling line with content: %s", line)
res = input_pattern.parse(line.rstrip())

if not res:
logger.error(
"Could not parse line: %s according to the input_specification: %s",
line,
args.input_specification,
)
continue

logger.debug("Parsed input: %s", res.named)

# Make input available as output (for passthrough operations)
output = res.named

if payload_raw := res.named.get("payload_raw"):
logger.debug("Found 'payload_raw' in input!")
payload = payload_raw.encode()
envelope = brefv.enclose(payload)
output["envelope"] = b64encode(envelope).decode()
elif payload_b64 := res.named.get("payload_b64"):
logger.debug("Found 'payload_b64' in input!")
payload = b64decode(payload_b64.encode())
envelope = brefv.enclose(payload)
output["envelope"] = b64encode(envelope).decode()
elif payload_pb2js := res.named.get("payload_pb2js"):
logger.debug("Found 'payload_pb2js' in input!")
if not (topic := res.named.get("topic")):
raise RuntimeError("payload_pb2js must be accompanied with a topic!")

parts = brefv.parse_pub_sub_topic(topic)
tag = parts["tag"]

if (
not brefv.is_tag_well_known(tag)
or not brefv.get_tag_encoding(tag) == "protobuf"
):
warnings.warn(
"payload_pb2js requires a well-known tag using protobuf encoding"
)
continue

type_name = brefv.get_tag_description(tag)
payload = brefv._PROTOBUF_INSTANCES[ # pylint: disable=protected-access
type_name
]()

payload_pb2js = json.loads(payload_pb2js)

payload = ParseDict(payload_pb2js, payload)
envelope = brefv.enclose(payload.SerializeToString())
output["envelope"] = b64encode(envelope).decode()

# Write to stdout
sys.stdout.write(args.output_specification.format(**output) + "\n")
sys.stdout.flush()


if __name__ == "__main__":
Expand Down Expand Up @@ -110,10 +176,5 @@ if __name__ == "__main__":
logging.captureWarnings(True)
warnings.filterwarnings("once")

logger = logging.getLogger("brefv")

# Compile pattern
input_pattern = parse.compile(args.input_specification)

# Dispatch
args.func(args)
89 changes: 0 additions & 89 deletions bin/enclose

This file was deleted.

92 changes: 0 additions & 92 deletions bin/uncover

This file was deleted.

2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
black==23.11.0
pylint==3.0.2
pytest==7.4.3
zenoh-cli
zenoh-cli==0.2.0
Loading

0 comments on commit 245d334

Please sign in to comment.