Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A trigger to read WITS0 data from a serial port #69

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions trigger/wits0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# WITS0 trigger
This trigger provides your folog application the ability to connect to a local serial port and read WITS0 data. The data can be sent out as raw packets or in parsed JSON format

## Installation
```bash
flogo install github.com/swarwick/Flogo/trigger/wits0
```
## Schema
Settings, Outputs and Endpoint:

```json
"output": [
{
"name": "data",
"type": "string"
}
],
"handler": {
"settings": [
{
"name": "SerialPort",
"type": "string"
},
{
"name": "BaudRate",
"type": "number"
},
{
"name": "DataBits",
"type": "number"
},
{
"name": "StopBits",
"type": "number"
},
{
"name": "Parity",
"type": "number"
},
{
"name": "ReadTimeoutSeconds",
"type": "number"
},
{
"name": "HeartBeatInterval",
"type": "number"
},
{
"name": "HeartBeatValue",
"type": "string"
},
{
"name": "PacketHeader",
"type": "string"
},
{
"name": "PacketFooter",
"type": "string"
},
{
"name": "LineSeparator",
"type": "string"
},
{
"name": "OutputRaw",
"type": "boolean"
}
]
}
```

Triggers are configured via the triggers.json of your application. The following is an example configuration of the WITS0 trigger.

### Start a flow

```json
{
"id": "wits0",
"settings": {
},
"handlers": [{
"action": {
"ref": "github.com/swarwick/flogo/action/flow",
"data": {
"flowURI": "res://flow:query"
}
},
"settings": {
"SerialPort": "/dev/ttyUSB0",
"BaudRate": 9600,
"DataBits": 8,
"StopBits": 1,
"Parity": 0,
"ReadTimeoutSeconds": 1,
"HeartBeatInterval": 30,
"HeartBeatValue": "&&\n0111-9999\n!!",
"PacketHeader": "&&",
"PacketFooter": "!!",
"LineSeparator": "\r\n",
"OutputRaw": false
}
}]
}
```
47 changes: 47 additions & 0 deletions trigger/wits0/records.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wits0

import (
"encoding/json"
"strings"
)

// Records the WITS0 packet structure
type Records struct {
Records []Record
}

// Record the WITS0 record structure
type Record struct {
Record string
Item string
Data string
}

func createJSON(packet string, settings *wits0Settings) string {
lines := strings.Split(packet, settings.lineEnding)
records := make([]Record, len(lines)-3)
parsingPackets := false
index := 0
for _, line := range lines {
line = strings.Replace(line, settings.lineEnding, "", -1)
if parsingPackets {
if line == settings.packetFooter {
parsingPackets = false
} else {
records[index].Record = line[0:2]
records[index].Item = line[2:4]
records[index].Data = line[4:len(line)]
index = index + 1
}
} else if line == settings.packetHeader {
parsingPackets = true
}
}
jsonRecord, err := json.Marshal(Records{records})
if err != nil {
log.Error("Error converting packet to JSON: ", err)
return ""
}
return string(jsonRecord)

}
128 changes: 128 additions & 0 deletions trigger/wits0/serialPort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package wits0

import (
"bytes"
"context"
"strings"
"time"

"github.com/TIBCOSoftware/flogo-lib/core/trigger"
"github.com/tarm/serial"
)

type serialPort struct {
settings *wits0Settings
stream *serial.Port
trigger *wits0Trigger
endpoint *trigger.Handler
}

func (serialPort *serialPort) Init(trigger *wits0Trigger, endpoint *trigger.Handler) {
serialPort.trigger = trigger
serialPort.endpoint = endpoint
serialPort.settings = &wits0Settings{}
serialPort.settings.Init(trigger, endpoint)
}

func (serialPort *serialPort) createSerialConnection() {
log.Info("Connecting to serial port: " + serialPort.settings.serialConfig.Name)
stream, err := serial.OpenPort(serialPort.settings.serialConfig)
if err != nil {
log.Error(err)
return
}
serialPort.stream = stream
log.Info("Connected to serial port")

serialPort.heartBeat()
serialPort.readSerialData()
}

func (serialPort *serialPort) heartBeat() {
if serialPort.settings.heartBeatInterval > 0 {
duration := time.Second * time.Duration(serialPort.settings.heartBeatInterval)

go func() {
start := time.Now()
writeLoop:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-serialPort.trigger.stopCheck:
break writeLoop
default:
}

elapsed := time.Now().Sub(start)
if elapsed > duration {
log.Debug("Sending heartbeat")
serialPort.stream.Write([]byte(serialPort.settings.heartBeatValue))
start = time.Now()
}
}
}()
}
}

func (serialPort *serialPort) readSerialData() {
buf := make([]byte, 1024)
buffer := bytes.NewBufferString("")
readLoop:
for {
n, err := serialPort.stream.Read(buf)
if err != nil {
log.Error(err)
break
}
if n > 0 {
buffer.Write(buf[:n])
buffer = serialPort.parseBuffer(buffer)
}

select {
case <-serialPort.trigger.stopCheck:
break readLoop
default:
}
}
}

func (serialPort *serialPort) parseBuffer(buffer *bytes.Buffer) *bytes.Buffer {
for buffer.Len() > 0 {
check := buffer.String()
indexStart := strings.Index(check, serialPort.settings.packetHeader)
indexEnd := strings.Index(check, serialPort.settings.packetFooterWithLineSeparator)
if indexEnd >= 0 && indexStart >= 0 && indexEnd < indexStart {
log.Info("Dropping initial bad packet")
return bytes.NewBufferString(check[indexStart:len(check)])
}
if indexStart >= 0 {
startRemoved := string(check[indexStart:len(check)])
indexStartSecond := indexStart + strings.Index(startRemoved, serialPort.settings.packetHeader)
if indexStartSecond > 0 && indexStartSecond > indexStart && indexStartSecond < indexEnd {
log.Info("Dropping bad packet")
return bytes.NewBufferString(check[indexStartSecond+len(serialPort.settings.packetHeader) : len(check)])
}
}
if indexStart >= 0 && indexEnd > indexStart {
indexEndIncludeStopPacket := indexEnd + len(serialPort.settings.packetFooterWithLineSeparator)
packet := check[indexStart:indexEndIncludeStopPacket]
outputData := packet
if !serialPort.settings.outputRaw {
outputData = createJSON(packet, serialPort.settings)
}
if len(outputData) > 0 {
trgData := make(map[string]interface{})
trgData["data"] = outputData
_, err := serialPort.endpoint.Handle(context.Background(), trgData)
if err != nil {
log.Error("Error starting action: ", err.Error())
}
}
buffer = bytes.NewBufferString(check[indexEndIncludeStopPacket:len(check)])
} else {
return buffer
}
}
return buffer
}
78 changes: 78 additions & 0 deletions trigger/wits0/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package wits0

import (
"strconv"
"time"

"github.com/TIBCOSoftware/flogo-lib/core/trigger"
"github.com/tarm/serial"
)

// wits0Record the WITS0 record structure
type wits0Settings struct {
serialConfig *serial.Config
packetHeader string
packetFooter string
lineEnding string
heartBeatValue string
heartBeatInterval int
packetFooterWithLineSeparator string
outputRaw bool
}

func (settings *wits0Settings) Init(t *wits0Trigger, endpoint *trigger.Handler) {
settings.serialConfig = &serial.Config{
Name: GetSettingSafe(endpoint, "SerialPort", ""),
Baud: GetSafeNumber(endpoint, "BaudRate", 9600),
ReadTimeout: time.Duration(GetSafeNumber(endpoint, "ReadTimeoutSeconds", 1)),
Size: byte(GetSafeNumber(endpoint, "DataBits", 8)),
Parity: serial.Parity(GetSafeNumber(endpoint, "Parity", 0)),
StopBits: serial.StopBits(GetSafeNumber(endpoint, "StopBits", 1)),
}

settings.packetHeader = GetSettingSafe(endpoint, "PacketHeader", "&&")
settings.packetFooter = GetSettingSafe(endpoint, "PacketFooter", "!!")
settings.lineEnding = GetSettingSafe(endpoint, "LineSeparator", "\r\n")
settings.heartBeatValue = GetSettingSafe(endpoint, "HeartBeatValue", "&&\r\n0111-9999\r\n!!\r\n")
settings.heartBeatInterval = GetSafeNumber(endpoint, "HeartBeatInterval", 30)
settings.packetFooterWithLineSeparator = settings.packetFooter + settings.lineEnding
settings.outputRaw = GetSafeBool(endpoint, "OutputRaw", false)

log.Debug("Serial Config: ", settings.serialConfig)
log.Debug("packetHeader: ", settings.packetHeader)
log.Debug("packetFooter: ", settings.packetFooter)
log.Debug("lineEnding: ", settings.lineEnding)
log.Debug("heartBeatValue: ", settings.heartBeatValue)
log.Debug("heartBeatInterval: ", settings.heartBeatInterval)
log.Debug("outputRaw: ", settings.outputRaw)
}

// GetSettingSafe get a setting and returns default if not found
func GetSettingSafe(endpoint *trigger.Handler, setting string, defaultValue string) string {
var retString string
defer func() {
if r := recover(); r != nil {
retString = defaultValue
}
}()
retString = endpoint.GetStringSetting(setting)
return retString
}

// GetSafeNumber gets the number from the config checking for empty and using default
func GetSafeNumber(endpoint *trigger.Handler, setting string, defaultValue int) int {
if settingString := GetSettingSafe(endpoint, setting, ""); settingString != "" {
value, _ := strconv.Atoi(settingString)
return value
}
return defaultValue
}

// GetSafeBool gets the bool from the config checking for empty and using default
func GetSafeBool(endpoint *trigger.Handler, setting string, defaultValue bool) bool {
if settingString := GetSettingSafe(endpoint, setting, ""); settingString != "" {
value, _ := strconv.ParseBool(settingString)
return value
}
return defaultValue
}
Loading