Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

[WIP] Add Application plugin and Event repeater #474

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions cmd/infrakit/util/eventrepeater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package util

import (
"bytes"
"context"
"fmt"
log "github.com/Sirupsen/logrus"
eventrepeater "github.com/docker/infrakit/pkg/application/eventrepeater"
"github.com/docker/infrakit/pkg/cli"
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/discovery/local"
"github.com/docker/infrakit/pkg/plugin"
"github.com/spf13/cobra"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"strings"
"time"
)

func eventrepeaterCommand(plugins func() discovery.Plugins) *cobra.Command {
cmd := &cobra.Command{
Use: "event-repeater",
Short: "Event Repeater service",
}
cmd.AddCommand(errunCommand(plugins), ermanageCommand(plugins))
return cmd
}

func errunCommand(plugins func() discovery.Plugins) *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Run Event Repeater service",
}

name := cmd.Flags().String("name", "app-event-repeater", "Application name to advertise for discovery")
logLevel := cmd.Flags().Int("log", cli.DefaultLogLevel, "Logging level. 0 is least verbose. Max is 5")
source := cmd.Flags().String("source", "event-plugin", "Event sourve address.")
sink := cmd.Flags().String("sink", "localhost:1883", "Event sink address. default: localhost:1883")
sinkProtocol := cmd.Flags().String("sinkprotocol", "mqtt", "Event sink protocol. Now only mqtt and stderr is implemented.")
allowall := cmd.Flags().Bool("allowall", false, "Allow all event from source and repeat the event to sink as same topic name. default: false")
listen := cmd.Flags().String("listen", "", "Application listen host:port")

cmd.RunE = func(c *cobra.Command, args []string) error {
cli.SetLogLevel(*logLevel)
dir := local.Dir()
os.MkdirAll(dir, 0700)
discoverPath := path.Join(dir, *name)
if *listen != "" {
discoverPath += ".listen"
}
pidPath := path.Join(dir, *name+".pid")
e := eventrepeater.NewEventRepeater(*source, *sink, *sinkProtocol, *allowall)
s, err := e.Serve(discoverPath, *listen)
if err != nil {
return err
}
err = ioutil.WriteFile(pidPath, []byte(fmt.Sprintf("%v", os.Getpid())), 0644)
if err != nil {
return err
}
log.Infoln("PID file at", pidPath)
if s != nil {
s.AwaitStopped()
}
// clean up
os.Remove(pidPath)
log.Infoln("Removed PID file at", pidPath)
os.Remove(discoverPath)
log.Infoln("Removed discover file at", discoverPath)

return nil
}
return cmd
}

func ermanageCommand(plugins func() discovery.Plugins) *cobra.Command {
cmd := &cobra.Command{
Use: "manage",
Short: "management Event Repeater service",
}
name := cmd.PersistentFlags().String("name", "", "Name of plugin")
path := cmd.PersistentFlags().String("path", "/events", "URL path of events default /events")
if !strings.HasPrefix(*path, "/") {
fmt.Printf("Path must start from \"/\" : %s ", *path)
return nil
}
var addr string
var protocol string
cmd.PersistentPreRunE = func(c *cobra.Command, args []string) error {
if err := cli.EnsurePersistentPreRunE(c); err != nil {
return err
}
endpoint, err := plugins().Find(plugin.Name(*name))
if err != nil {
return err
}
addr = endpoint.Address
protocol = endpoint.Protocol
return nil
}

value := ""

send := func(method string, body string) error {
switch protocol {
case "tcp":
url := strings.Replace(addr, "tcp:", "http:", 1)
req, err := http.NewRequest(
method,
url+*path,
bytes.NewBuffer([]byte(body)),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: time.Duration(10) * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
respbody, _ := ioutil.ReadAll(resp.Body)
logger.Info("Send Request", "URL", url+*path, "Request Body", body, "Respence Status", resp.Status, "Respence Body", string(respbody))
defer resp.Body.Close()
case "unix":
req, err := http.NewRequest(
method,
"http://unix"+*path,
bytes.NewBuffer([]byte(body)),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
client := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", addr)
},
},
Timeout: time.Duration(10) * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return err
}
respbody, _ := ioutil.ReadAll(resp.Body)
logger.Info("Send Request", "URL", addr+*path, "Request Body", body, "Respence Status", resp.Status, "Respence Body", string(respbody))
defer resp.Body.Close()
}
return nil
}
post := &cobra.Command{
Use: "post",
Short: "Post request to event repeater.",
RunE: func(c *cobra.Command, args []string) error {
err := send("POST", value)
if err != nil {
return err
}
return nil
},
}
post.Flags().StringVar(&value, "value", value, "update value")

delete := &cobra.Command{
Use: "delete",
Short: "Delete request to event repeater.",
RunE: func(c *cobra.Command, args []string) error {
err := send("DELETE", value)
if err != nil {
return err
}

return nil
},
}
delete.Flags().StringVar(&value, "value", value, "update value")

put := &cobra.Command{
Use: "put",
Short: "Put request to event repeater.",
RunE: func(c *cobra.Command, args []string) error {
err := send("PUT", value)
if err != nil {
return err
}

return nil
},
}
put.Flags().StringVar(&value, "value", value, "update value")

get := &cobra.Command{
Use: "get",
Short: "Get request to event repeater.",
RunE: func(c *cobra.Command, args []string) error {
err := send("GET", value)
if err != nil {
return err
}
return nil
},
}

cmd.AddCommand(post)
cmd.AddCommand(delete)
cmd.AddCommand(get)
cmd.AddCommand(put)

return cmd

}
2 changes: 1 addition & 1 deletion cmd/infrakit/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
Short: "Utilties",
}

util.AddCommand(muxCommand(plugins), fileServerCommand(plugins), trackCommand(plugins))
util.AddCommand(muxCommand(plugins), fileServerCommand(plugins), trackCommand(plugins), eventrepeaterCommand(plugins))

return util
}
138 changes: 138 additions & 0 deletions pkg/application/eventrepeater/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Infrakit Event Repeater Application
This is a sample of Infrakit Application.
It enable to repeat events from event plugin to MQTT brocker.

## Get Start

### Prepare
Start event plugin and mqtt broker
```
$ ./build/infrakit-event-time
$ docker run -it --rm -p 1883:1883 eclipse-mosquitto
```

## Run event repeater

```


___ ________ ________ ________ ________ ___ __ ___ _________
|\ \|\ ___ \|\ _____\\ __ \|\ __ \|\ \|\ \ |\ \|\___ ___\
\ \ \ \ \\ \ \ \ \__/\ \ \|\ \ \ \|\ \ \ \/ /|\ \ \|___ \ \_|
\ \ \ \ \\ \ \ \ __\\ \ _ _\ \ __ \ \ ___ \ \ \ \ \ \
\ \ \ \ \\ \ \ \ \_| \ \ \\ \\ \ \ \ \ \ \\ \ \ \ \ \ \ \
\ \__\ \__\\ \__\ \__\ \ \__\\ _\\ \__\ \__\ \__\\ \__\ \__\ \ \__\
\|__|\|__| \|__|\|__| \|__|\|__|\|__|\|__|\|__| \|__|\|__| \|__|


Event Repeater service

Usage:
infrakit util event-repeater [flags]

Flags:
--allowall Allow all event from source and repeat the event to sink as same topic name. default: false
--listen string Application listen host:port
--log int Logging level. 0 is least verbose. Max is 5 (default 4)
--name string Application name to advertise for discovery (default "app-event-repeater")
--sink string Event sink address. default: localhost:1883 (default "localhost:1883")
--sinkprotocol string Event sink protocol. Now only mqtt and stderr is implemented. (default "mqtt")
--source string Event sourve address. (default "event-plugin")

Global Flags:
-H, --host stringSlice host list. Default is local sockets
--httptest.serve string if non-empty, httptest.NewServer serves on this address and blocks
--log-caller include caller function (default true)
--log-format string log format: logfmt|term|json (default "term")
--log-stack include caller stack
--log-stdout log to stdout


$ infrakit util event-repeater run --source ~/.infrakit/plugins/event-time --sink tcp://localhost:1883
```

Now your app connected to event plugin and mqtt broker.
If you set `—-allowall`, your app subscribe ‘.’ Topic from event and publish all events to broker with original topic.
You can specify repeat topics with infrakit command like below.
Infrakit app: event-repeater serve REST API.
At default, it listen with unix socket.
If you want to use tcp socket instead of unix socket, set option like below.

```
infrakit util event-repeater run --listen localhost:8080 --source ~/.infrakit/plugins/event-time --sink tcp://localhost:1883
```
## Manage event repeater

You can manipurate Infrakit application with `infrakit util application` command.

```
$ infrakit util event-repeater manage -h

___ ________ ________ ________ ________ ___ __ ___ _________
|\ \|\ ___ \|\ _____\\ __ \|\ __ \|\ \|\ \ |\ \|\___ ___\
\ \ \ \ \\ \ \ \ \__/\ \ \|\ \ \ \|\ \ \ \/ /|\ \ \|___ \ \_|
\ \ \ \ \\ \ \ \ __\\ \ _ _\ \ __ \ \ ___ \ \ \ \ \ \
\ \ \ \ \\ \ \ \ \_| \ \ \\ \\ \ \ \ \ \ \\ \ \ \ \ \ \ \
\ \__\ \__\\ \__\ \__\ \ \__\\ _\\ \__\ \__\ \__\\ \__\ \__\ \ \__\
\|__|\|__| \|__|\|__| \|__|\|__|\|__|\|__|\|__| \|__|\|__| \|__|


Access application plugins

Usage:
infrakit util event-repeater manage [command]

Available Commands:
delete Delete request to application.
get Get request to application.
post Post request to application.
put Put request to application.

Flags:
--name string Name of plugin
--path string URL path of resource e.g. /resources/resourceID/ (default "/")

Global Flags:
-H, --host stringSlice host list. Default is local sockets
--httptest.serve string if non-empty, httptest.NewServer serves on this address and blocks
--log int log level (default 4)
--log-caller include caller function (default true)
--log-format string log format: logfmt|term|json (default "term")
--log-stack include caller stack
--log-stdout log to stdout

Use "infrakit util event-repeater manage [command] --help" for more information about a command.

```
As you see, you can send REST request with `get, post, put, delete` commands.
You do not have to consious about whether your application is listening on UNIX sockets or TCP ports.
Only specify your application name.
And with `--path` specify the target resource of the application.
For example, in event-repeater you should set `--path /events`
Except for `get` command, you can set json quary by `--value` option.
In the example below, you specify the event that is the target of repeate and the topic when publishing the event as mqtt.

```
infrakit util event-repeater manaeg --name app-event-repeater --path /events post --value '[{"sourcetopic":"timer/sec/1","sinktopic":"/time/1s"},{"sourcetopic":"timer/msec/500","sinktopic":"/time/500m"}]'
```

Ofcource, you can same operation with other tool e.g. `curl`.

```
TCP Port: curl -v -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '[{"sourcetopic":"timer/sec/1","sinktopic":"/time/1s"},{"sourcetopic":"timer/msec/500","sinktopic":"/time/500m"}]' http://localhost:8080/events
Unix Socket : curl -v -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '[{"sourcetopic":"timer/sec/1","sinktopic":"/time/1s"},{"sourcetopic":"timer/msec/500","sinktopic":"/time/500m"}]' --unix-socket /home/ubuntu/.infrakit/plugins/app-event-repeater.listen http:/events
```
Target events are described json style.
Then you can delete registerd event.

```
$ infrakit util event-repeater manage --name app-event-repeater delete --value '[{"sourcetopic":"timer/sec/1”}]’
```
Repeated events are encoded with byte.
You can decode it like below.

```
any := types.AnyBytes(SUBSCRIVED_MESSAGE.Payload())
subevent := event.Event{}
err := any.Decode(&subevent)
```
Loading