diff --git a/trigger/wits0/README.md b/trigger/wits0/README.md new file mode 100644 index 00000000..ca46fd2a --- /dev/null +++ b/trigger/wits0/README.md @@ -0,0 +1,104 @@ +# WITS0 trigger +This trigger provides your folog application the ability to connect to a local serial port and read WITS0 data. The data can be sent out as raw packets or in parsed JSON format + +## Installation +```bash +flogo install github.com/swarwick/Flogo/trigger/wits0 +``` +## Schema +Settings, Outputs and Endpoint: + +```json + "output": [ + { + "name": "data", + "type": "string" + } + ], + "handler": { + "settings": [ + { + "name": "SerialPort", + "type": "string" + }, + { + "name": "BaudRate", + "type": "number" + }, + { + "name": "DataBits", + "type": "number" + }, + { + "name": "StopBits", + "type": "number" + }, + { + "name": "Parity", + "type": "number" + }, + { + "name": "ReadTimeoutSeconds", + "type": "number" + }, + { + "name": "HeartBeatInterval", + "type": "number" + }, + { + "name": "HeartBeatValue", + "type": "string" + }, + { + "name": "PacketHeader", + "type": "string" + }, + { + "name": "PacketFooter", + "type": "string" + }, + { + "name": "LineSeparator", + "type": "string" + }, + { + "name": "OutputRaw", + "type": "boolean" + } + ] + } +``` + +Triggers are configured via the triggers.json of your application. The following is an example configuration of the WITS0 trigger. + +### Start a flow + +```json +{ + "id": "wits0", + "settings": { + }, + "handlers": [{ + "action": { + "ref": "github.com/swarwick/flogo/action/flow", + "data": { + "flowURI": "res://flow:query" + } + }, + "settings": { + "SerialPort": "/dev/ttyUSB0", + "BaudRate": 9600, + "DataBits": 8, + "StopBits": 1, + "Parity": 0, + "ReadTimeoutSeconds": 1, + "HeartBeatInterval": 30, + "HeartBeatValue": "&&\n0111-9999\n!!", + "PacketHeader": "&&", + "PacketFooter": "!!", + "LineSeparator": "\r\n", + "OutputRaw": false + } + }] +} +``` \ No newline at end of file diff --git a/trigger/wits0/records.go b/trigger/wits0/records.go new file mode 100644 index 00000000..c6fa224a --- /dev/null +++ b/trigger/wits0/records.go @@ -0,0 +1,47 @@ +package wits0 + +import ( + "encoding/json" + "strings" +) + +// Records the WITS0 packet structure +type Records struct { + Records []Record +} + +// Record the WITS0 record structure +type Record struct { + Record string + Item string + Data string +} + +func createJSON(packet string, settings *wits0Settings) string { + lines := strings.Split(packet, settings.lineEnding) + records := make([]Record, len(lines)-3) + parsingPackets := false + index := 0 + for _, line := range lines { + line = strings.Replace(line, settings.lineEnding, "", -1) + if parsingPackets { + if line == settings.packetFooter { + parsingPackets = false + } else { + records[index].Record = line[0:2] + records[index].Item = line[2:4] + records[index].Data = line[4:len(line)] + index = index + 1 + } + } else if line == settings.packetHeader { + parsingPackets = true + } + } + jsonRecord, err := json.Marshal(Records{records}) + if err != nil { + log.Error("Error converting packet to JSON: ", err) + return "" + } + return string(jsonRecord) + +} diff --git a/trigger/wits0/serialPort.go b/trigger/wits0/serialPort.go new file mode 100644 index 00000000..6344ff5c --- /dev/null +++ b/trigger/wits0/serialPort.go @@ -0,0 +1,128 @@ +package wits0 + +import ( + "bytes" + "context" + "strings" + "time" + + "github.com/TIBCOSoftware/flogo-lib/core/trigger" + "github.com/tarm/serial" +) + +type serialPort struct { + settings *wits0Settings + stream *serial.Port + trigger *wits0Trigger + endpoint *trigger.Handler +} + +func (serialPort *serialPort) Init(trigger *wits0Trigger, endpoint *trigger.Handler) { + serialPort.trigger = trigger + serialPort.endpoint = endpoint + serialPort.settings = &wits0Settings{} + serialPort.settings.Init(trigger, endpoint) +} + +func (serialPort *serialPort) createSerialConnection() { + log.Info("Connecting to serial port: " + serialPort.settings.serialConfig.Name) + stream, err := serial.OpenPort(serialPort.settings.serialConfig) + if err != nil { + log.Error(err) + return + } + serialPort.stream = stream + log.Info("Connected to serial port") + + serialPort.heartBeat() + serialPort.readSerialData() +} + +func (serialPort *serialPort) heartBeat() { + if serialPort.settings.heartBeatInterval > 0 { + duration := time.Second * time.Duration(serialPort.settings.heartBeatInterval) + + go func() { + start := time.Now() + writeLoop: + for { + time.Sleep(time.Millisecond * 100) + select { + case <-serialPort.trigger.stopCheck: + break writeLoop + default: + } + + elapsed := time.Now().Sub(start) + if elapsed > duration { + log.Debug("Sending heartbeat") + serialPort.stream.Write([]byte(serialPort.settings.heartBeatValue)) + start = time.Now() + } + } + }() + } +} + +func (serialPort *serialPort) readSerialData() { + buf := make([]byte, 1024) + buffer := bytes.NewBufferString("") +readLoop: + for { + n, err := serialPort.stream.Read(buf) + if err != nil { + log.Error(err) + break + } + if n > 0 { + buffer.Write(buf[:n]) + buffer = serialPort.parseBuffer(buffer) + } + + select { + case <-serialPort.trigger.stopCheck: + break readLoop + default: + } + } +} + +func (serialPort *serialPort) parseBuffer(buffer *bytes.Buffer) *bytes.Buffer { + for buffer.Len() > 0 { + check := buffer.String() + indexStart := strings.Index(check, serialPort.settings.packetHeader) + indexEnd := strings.Index(check, serialPort.settings.packetFooterWithLineSeparator) + if indexEnd >= 0 && indexStart >= 0 && indexEnd < indexStart { + log.Info("Dropping initial bad packet") + return bytes.NewBufferString(check[indexStart:len(check)]) + } + if indexStart >= 0 { + startRemoved := string(check[indexStart:len(check)]) + indexStartSecond := indexStart + strings.Index(startRemoved, serialPort.settings.packetHeader) + if indexStartSecond > 0 && indexStartSecond > indexStart && indexStartSecond < indexEnd { + log.Info("Dropping bad packet") + return bytes.NewBufferString(check[indexStartSecond+len(serialPort.settings.packetHeader) : len(check)]) + } + } + if indexStart >= 0 && indexEnd > indexStart { + indexEndIncludeStopPacket := indexEnd + len(serialPort.settings.packetFooterWithLineSeparator) + packet := check[indexStart:indexEndIncludeStopPacket] + outputData := packet + if !serialPort.settings.outputRaw { + outputData = createJSON(packet, serialPort.settings) + } + if len(outputData) > 0 { + trgData := make(map[string]interface{}) + trgData["data"] = outputData + _, err := serialPort.endpoint.Handle(context.Background(), trgData) + if err != nil { + log.Error("Error starting action: ", err.Error()) + } + } + buffer = bytes.NewBufferString(check[indexEndIncludeStopPacket:len(check)]) + } else { + return buffer + } + } + return buffer +} diff --git a/trigger/wits0/settings.go b/trigger/wits0/settings.go new file mode 100644 index 00000000..6efbb42a --- /dev/null +++ b/trigger/wits0/settings.go @@ -0,0 +1,78 @@ +package wits0 + +import ( + "strconv" + "time" + + "github.com/TIBCOSoftware/flogo-lib/core/trigger" + "github.com/tarm/serial" +) + +// wits0Record the WITS0 record structure +type wits0Settings struct { + serialConfig *serial.Config + packetHeader string + packetFooter string + lineEnding string + heartBeatValue string + heartBeatInterval int + packetFooterWithLineSeparator string + outputRaw bool +} + +func (settings *wits0Settings) Init(t *wits0Trigger, endpoint *trigger.Handler) { + settings.serialConfig = &serial.Config{ + Name: GetSettingSafe(endpoint, "SerialPort", ""), + Baud: GetSafeNumber(endpoint, "BaudRate", 9600), + ReadTimeout: time.Duration(GetSafeNumber(endpoint, "ReadTimeoutSeconds", 1)), + Size: byte(GetSafeNumber(endpoint, "DataBits", 8)), + Parity: serial.Parity(GetSafeNumber(endpoint, "Parity", 0)), + StopBits: serial.StopBits(GetSafeNumber(endpoint, "StopBits", 1)), + } + + settings.packetHeader = GetSettingSafe(endpoint, "PacketHeader", "&&") + settings.packetFooter = GetSettingSafe(endpoint, "PacketFooter", "!!") + settings.lineEnding = GetSettingSafe(endpoint, "LineSeparator", "\r\n") + settings.heartBeatValue = GetSettingSafe(endpoint, "HeartBeatValue", "&&\r\n0111-9999\r\n!!\r\n") + settings.heartBeatInterval = GetSafeNumber(endpoint, "HeartBeatInterval", 30) + settings.packetFooterWithLineSeparator = settings.packetFooter + settings.lineEnding + settings.outputRaw = GetSafeBool(endpoint, "OutputRaw", false) + + log.Debug("Serial Config: ", settings.serialConfig) + log.Debug("packetHeader: ", settings.packetHeader) + log.Debug("packetFooter: ", settings.packetFooter) + log.Debug("lineEnding: ", settings.lineEnding) + log.Debug("heartBeatValue: ", settings.heartBeatValue) + log.Debug("heartBeatInterval: ", settings.heartBeatInterval) + log.Debug("outputRaw: ", settings.outputRaw) +} + +// GetSettingSafe get a setting and returns default if not found +func GetSettingSafe(endpoint *trigger.Handler, setting string, defaultValue string) string { + var retString string + defer func() { + if r := recover(); r != nil { + retString = defaultValue + } + }() + retString = endpoint.GetStringSetting(setting) + return retString +} + +// GetSafeNumber gets the number from the config checking for empty and using default +func GetSafeNumber(endpoint *trigger.Handler, setting string, defaultValue int) int { + if settingString := GetSettingSafe(endpoint, setting, ""); settingString != "" { + value, _ := strconv.Atoi(settingString) + return value + } + return defaultValue +} + +// GetSafeBool gets the bool from the config checking for empty and using default +func GetSafeBool(endpoint *trigger.Handler, setting string, defaultValue bool) bool { + if settingString := GetSettingSafe(endpoint, setting, ""); settingString != "" { + value, _ := strconv.ParseBool(settingString) + return value + } + return defaultValue +} diff --git a/trigger/wits0/trigger.go b/trigger/wits0/trigger.go new file mode 100644 index 00000000..34770a11 --- /dev/null +++ b/trigger/wits0/trigger.go @@ -0,0 +1,64 @@ +package wits0 + +import ( + "github.com/TIBCOSoftware/flogo-lib/core/action" + "github.com/TIBCOSoftware/flogo-lib/core/trigger" + "github.com/TIBCOSoftware/flogo-lib/logger" +) + +// log is the default package logger +var log = logger.GetLogger("trigger-wits0") + +// wits0TriggerFactory My wits0Trigger factory +type wits0TriggerFactory struct { + metadata *trigger.Metadata +} + +// NewFactory create a new wits0Trigger factory +func NewFactory(md *trigger.Metadata) trigger.Factory { + return &wits0TriggerFactory{metadata: md} +} + +// New Creates a new trigger instance for a given id +func (trigger *wits0TriggerFactory) New(config *trigger.Config) trigger.Trigger { + return &wits0Trigger{metadata: trigger.metadata, config: config} +} + +// wits0Trigger is the wits0Trigger implementation +type wits0Trigger struct { + metadata *trigger.Metadata + runner action.Runner + config *trigger.Config + handlers []*trigger.Handler + stopCheck chan bool +} + +func (trigger *wits0Trigger) Initialize(ctx trigger.InitContext) error { + log.Debug("Initialize") + trigger.handlers = ctx.GetHandlers() + return nil +} + +func (trigger *wits0Trigger) Metadata() *trigger.Metadata { + return trigger.metadata +} + +// Start implements trigger.wits0Trigger.Start +func (trigger *wits0Trigger) Start() error { + log.Debug("Start") + trigger.stopCheck = make(chan bool) + handlers := trigger.handlers + for _, handler := range handlers { + serial := &serialPort{} + serial.Init(trigger, handler) + serial.createSerialConnection() + } + return nil +} + +// Stop implements trigger.wits0Trigger.Start +func (trigger *wits0Trigger) Stop() error { + // stop the trigger + close(trigger.stopCheck) + return nil +} diff --git a/trigger/wits0/trigger.json b/trigger/wits0/trigger.json new file mode 100644 index 00000000..77387fc3 --- /dev/null +++ b/trigger/wits0/trigger.json @@ -0,0 +1,80 @@ +{ + "name": "wits0", + "version": "1.0.0", + "type": "flogo:trigger", + "ref": "github.com/TIBCOSoftware/flogo-contrib/trigger/wits0", + "description": "Read WITS0 data from a serial port and trigger output as either raw string data or parsed into JSON", + "author": "swarwick@tibco.com", + "settings":[ + ], + "output": [ + { + "name": "data", + "type": "string" + } + ], + "handler": { + "settings": [ + { + "name": "SerialPort", + "type": "string", + "required" : true + }, + { + "name": "BaudRate", + "type": "number", + "value": 9600 + }, + { + "name": "DataBits", + "type": "number", + "value": 8 + }, + { + "name": "StopBits", + "type": "number", + "value": 1 + }, + { + "name": "Parity", + "type": "number", + "value": 0 + }, + { + "name": "ReadTimeoutSeconds", + "type": "number", + "value": 1 + }, + { + "name": "HeartBeatInterval", + "type": "number", + "value": 30 + }, + { + "name": "HeartBeatValue", + "type": "string", + "value": "&&\r\n0111-9999\r\n!!\r\n" + }, + { + "name": "PacketHeader", + "type": "string", + "value": "&&" + }, + { + "name": "PacketFooter", + "type": "string", + "value": "!!" + }, + { + "name": "LineSeparator", + "type": "string", + "value": "\r\n" + }, + { + "name": "OutputRaw", + "type": "boolean", + "value": false + } + ] + } +} \ No newline at end of file diff --git a/trigger/wits0/trigger_test.go b/trigger/wits0/trigger_test.go new file mode 100644 index 00000000..fe6768f4 --- /dev/null +++ b/trigger/wits0/trigger_test.go @@ -0,0 +1,310 @@ +package wits0 + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/TIBCOSoftware/flogo-lib/core/action" + "github.com/TIBCOSoftware/flogo-lib/core/data" + "github.com/TIBCOSoftware/flogo-lib/core/trigger" + "github.com/TIBCOSoftware/flogo-lib/logger" +) + +func getJSONMetadata() string { + jsonMetadataBytes, err := ioutil.ReadFile("trigger.json") + if err != nil { + panic("No Json Metadata found for trigger.json path") + } + return string(jsonMetadataBytes) +} + +const testConfig string = `{ + "id": "wits0", + "settings": { + + }, + "handlers": [{ + "action": { + "ref": "github.com/TIBCOSoftware/flogo-contrib/action/flow", + "data": { + "flowURI": "res://flow:query" + } + }, + "settings": { + "SerialPort": "/dev/ttyUSB0", + "HeartBeatValue": "&&\n0111-9999\n!!", + "PacketHeader": "&&", + "PacketFooter": "!!", + "LineSeparator":"\r\n", + "HeartBeatInterval": 1 + } + }] +}` + +const testConfigRaw string = `{ + "id": "wits0", + "settings": { + + }, + "handlers": [{ + "action": { + "ref": "github.com/TIBCOSoftware/flogo-contrib/action/flow", + "data": { + "flowURI": "res://flow:query" + } + }, + "settings": { + "SerialPort": "/dev/ttyUSB0", + "HeartBeatValue": "&&\n0111-9999\n!!", + "PacketHeader": "&&", + "PacketFooter": "!!", + "LineSeparator":"\r\n", + "OutputRaw": true + } + }] +}` + +const testConfigBaseSerialPort string = `{ + "id": "wits0", + "settings": { + }, + "handlers": [{ + "action": { + "ref": "github.com/TIBCOSoftware/flogo-contrib/action/flow", + "data": { + "flowURI": "res://flow:query" + } + }, + "settings": { + "SerialPort": "/dev/dummy", + "HeartBeatValue": "&&\n0111-9999\n!!", + "PacketHeader": "&&", + "PacketFooter": "!!", + "LineSeparator":"\r\n", + "HeartBeatInterval": 2 + } + }] +}` + +const testData string = ` +&& +1984PASON/EDR +01085871.95 +01105893.00 +0112110.00 +01130.00 +011544.38 +01170.00 +01190.00 +01200.00 +01210.00 +01220.00 +01230.00 +01240.00 +0125-9999.00 +0126718.42 +012717.50 +01281.06 +01300.00 +01374483.00 +0139-8888.00 +0140-9999.00 +0141-9999.00 +0142313557.96 +01430.00 +01444483.24 +01450.00 +01695896.69 +017023.30 +0171-4066.40 +0172-9999.00 +0173-9999.00 +!! + +&& +1984PASON/EDR +01085871.95 +01105893.00 +0112108.46 +01130.00 +011544.79 +01170.00 +01190.01 +01200.00 +01210.00 +01220.00 +01230.00 +01240.00 +0125-9999.00 +0126718.03 +012717.12 +01281.06 +01300.00 +01374483.00 +0139-8888.00 +0140-9999.00 +0141-9999.00 +0142313557.96 +01430.00 +01444483.24 +01450.00 +01695896.69 +017023.37 +0171-4066.40 +0172-9999.00 +0173-9999.00 +!! + +&& +1984PASON/EDR +631070.53 +631170.53 +633970.53 +634070.53 +!! + +` + +type initContext struct { + handlers []*trigger.Handler +} + +func (ctx initContext) GetHandlers() []*trigger.Handler { + return ctx.handlers +} + +type TestRunner struct { +} + +var Test action.Runner + +// Run implements action.Runner.Run +func (tr *TestRunner) Run(context context.Context, action action.Action, uri string, options interface{}) (code int, data interface{}, err error) { + //log.Infof("Ran Action (Run): %v", uri) + return 0, nil, nil +} + +func (tr *TestRunner) RunAction(ctx context.Context, act action.Action, options map[string]interface{}) (results map[string]*data.Attribute, err error) { + //log.Infof("Ran Action (RunAction): %v", act) + return nil, nil +} + +func (tr *TestRunner) Execute(ctx context.Context, act action.Action, inputs map[string]*data.Attribute) (results map[string]*data.Attribute, err error) { + //log.Infof("Ran Action (Execute): %v", act) + value := inputs["data"].Value().(string) + log.Info(value) + return nil, nil +} + +type TestAction struct { +} + +func (tr *TestAction) Metadata() *action.Metadata { + //log.Infof("Metadata") + return nil +} + +func (tr *TestAction) IOMetadata() *data.IOMetadata { + //log.Infof("IOMetadata") + return nil +} + +func TestParse(t *testing.T) { + trg, config := createTrigger(t, testConfig) + initializeTrigger(t, trg, config) + serialPort := &serialPort{} + trgWits0 := trg.(*wits0Trigger) + serialPort.Init(trgWits0, trgWits0.handlers[0]) + replaceData := strings.ReplaceAll(testData, "\n", "\r\n") + data := bytes.NewBufferString(replaceData) + outputBuffer := serialPort.parseBuffer(data) + log.Debug(outputBuffer) +} + +func TestParseRaw(t *testing.T) { + trg, config := createTrigger(t, testConfigRaw) + initializeTrigger(t, trg, config) + serialPort := &serialPort{} + trgWits0 := trg.(*wits0Trigger) + serialPort.Init(trgWits0, trgWits0.handlers[0]) + replaceData := strings.ReplaceAll(testData, "\n", "\r\n") + data := bytes.NewBufferString(replaceData) + outputBuffer := serialPort.parseBuffer(data) + log.Debug(outputBuffer) +} + +/* +func TestConnect(t *testing.T) { + trg, config := createTrigger(t, testConfig) + initializeTrigger(t, trg, config) + runTrigger(5, trg) +} + +func TestConnectRaw(t *testing.T) { + trg, config := createTrigger(t, testConfigRaw) + initializeTrigger(t, trg, config) + runTrigger(5, trg) +} +*/ +func TestBadSerialPort(t *testing.T) { + trg, config := createTrigger(t, testConfigBaseSerialPort) + initializeTrigger(t, trg, config) + runTrigger(5, trg) +} + +func runTrigger(timeout int, trg trigger.Trigger) { + go func() { + time.Sleep(time.Second * time.Duration(timeout)) + trg.Stop() + }() + + trg.Start() +} + +func createTrigger(t *testing.T, conf string) (trigger.Trigger, trigger.Config) { + log.SetLogLevel(logger.DebugLevel) + md := trigger.NewMetadata(getJSONMetadata()) + f := NewFactory(md) + config := trigger.Config{} + if f == nil { + t.Fail() + return nil, config + } + + jsonErr := json.Unmarshal([]byte(conf), &config) + if jsonErr != nil { + log.Error(jsonErr) + t.Fail() + return nil, config + } + trg := f.New(&config) + return trg, config +} + +func initializeTrigger(t *testing.T, trg trigger.Trigger, config trigger.Config) trigger.Initializable { + if trg == nil { + t.Fail() + return nil + } + newTrg, _ := trg.(trigger.Initializable) + + initCtx := &initContext{handlers: make([]*trigger.Handler, 0, len(config.Handlers))} + runner := &TestRunner{} + action := &TestAction{} + //create handlers for that trigger and init + for _, hConfig := range config.Handlers { + log.Infof("hConfig: %v", hConfig) + log.Infof("trg.Metadata().Output: %v", trg.Metadata().Output) + log.Infof("trg.Metadata().Reply: %v", trg.Metadata().Reply) + handler := trigger.NewHandler(hConfig, action, trg.Metadata().Output, trg.Metadata().Reply, runner) + initCtx.handlers = append(initCtx.handlers, handler) + } + + newTrg.Initialize(initCtx) + return newTrg +}