Skip to content

Commit

Permalink
Merge pull request #735 from emqx/dev/1.1.2
Browse files Browse the repository at this point in the history
Dev/1.1.2
  • Loading branch information
ngjaying authored Mar 3, 2021
2 parents 24892eb + b708bc5 commit af7e6cf
Show file tree
Hide file tree
Showing 117 changed files with 3,936 additions and 421 deletions.
10 changes: 7 additions & 3 deletions .ci/Dockerfile-plugins
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN set -e -u -x \
&& for lib in $(cat etc/$PLUGIN_TYPE/$PLUGIN_NAME.json | jq -r ".libs[]"); do go get $lib; done \
&& case $PLUGIN_NAME in \
influxdb ) \
go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
go build -trimpath --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
;; \
tdengine ) \
if [ "$(uname -m)" = "x86_64" ]; then \
Expand All @@ -28,10 +28,14 @@ RUN set -e -u -x \
fi; \
tar -zxvf /tmp/TDengine-client-2.0.6.0.tar.gz \
&& cd TDengine-client-2.0.6.0 && ./install_client.sh && cd - \
&& go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
&& go build -trimpath --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
;; \
labelImage ) \
git clone -b v2.2.0-rc3 --depth 1 https://github.com/tensorflow/tensorflow.git /tensorflow; \
CGO_CFLAGS=-I/tensorflow CGO_LDFLAGS=-L/go/kuiper/plugins/functions/labelImage/lib go build -trimpath --buildmode=plugin -o plugins/functions/labelImage/labelImage.so plugins/functions/labelImage/*.go \
;; \
* ) \
go build --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \
go build -trimpath --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/*.go \
;; \
esac \
&& if [ -f "etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml" ]; then cp etc/$PLUGIN_TYPE/$PLUGIN_NAME.yaml plugins/$PLUGIN_TYPE/$PLUGIN_NAME; fi \
Expand Down
26 changes: 14 additions & 12 deletions .github/workflows/build_packages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,12 @@ jobs:
- functions/accumulateWordCount
- functions/countPlusOne
- functions/echo
- functions/thumbnail
- functions/resize
- functions/image
- functions/geohash
- functions/labelImage

steps:
- uses: actions/checkout@v1
- name: install docker
run: |
sudo apt-get remove docker docker-engine docker.io containerd runc
sudo apt-get update
sudo apt-get install apt-transport-https ca-certificates curl gnupg-agent software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io
- name: prepare docker
run: |
mkdir -p $HOME/.docker
Expand All @@ -93,7 +85,7 @@ jobs:
docker run --rm --privileged tonistiigi/binfmt --install all
- name: build docker image
run: make docker -j4
- name: buiild debian plugins
- name: build debian plugins
env:
PLUGIN: ${{ matrix.plugin }}
run: make ${PLUGIN}
Expand All @@ -117,6 +109,16 @@ jobs:
${ip_address}:9081/plugins/${plugin_type} \
-X POST \
-d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"shellParas\": [\"2.0.3.1\"]}"
elif [ "${plugin_name}" = "image" ]; then
curl \
${ip_address}:9081/plugins/${plugin_type} \
-X POST \
-d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"functions\": [\"resize\",\"thumbnail\"]}"
elif [ "${plugin_name}" = "geohash" ]; then
curl \
${ip_address}:9081/plugins/${plugin_type} \
-X POST \
-d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"functions\": [\"geohashEncode\", \"geohashEncodeInt\", \"geohashDecode\", \"geohashDecodeInt\", \"geohashBoundingBox\", \"geohashBoundingBoxInt\", \"geohashNeighbor\", \"geohashNeighborInt\", \"geohashNeighbors\", \"geohashNeighborsInt\"]}"
else
curl \
${ip_address}:9081/plugins/${plugin_type} \
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/run_fvt_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ jobs:
run: |
sudo apt update && sudo apt install pkg-config libczmq-dev -y
make build_with_edgex
go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq/zmq.go
go build -trimpath --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq/zmq.go
go build -trimpath --buildmode=plugin -o plugins/functions/Image.so plugins/functions/image/*.go
- name: run edgex && emqx && kuiper
run: |
sudo ./fvt_scripts/setup_env.sh
Expand Down
29 changes: 22 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ build_prepare:
build_without_edgex: build_prepare
@if [ ! -z $(GOOS) ] && [ ! -z $(GOARCH) ] && [ $(CGO_ENABLED) == 0 ];then \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper xstream/cli/main.go; \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd xstream/server/main.go; \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd xstream/server/main.go; \
else \
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper xstream/cli/main.go; \
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd xstream/server/main.go; \
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd xstream/server/main.go; \
fi
@if [ ! -z $$(which upx) ] && [ "$$(uname -m)" != "aarch64" ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
Expand All @@ -66,10 +66,10 @@ pkg_without_edgex: build_without_edgex
build_with_edgex: build_prepare
@if [ ! -z $(GOOS) ] && [ ! -z $(GOARCH) ] && [ $(CGO_ENABLED) == 0 ];then \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiper xstream/cli/main.go; \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiperd xstream/server/main.go; \
GO111MODULE=on GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiperd xstream/server/main.go; \
else \
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiper xstream/cli/main.go; \
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiperd xstream/server/main.go; \
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags edgex -o kuiperd xstream/server/main.go; \
fi
@if [ ! -z $$(which upx) ] && [ "$$(uname -m)" != "aarch64" ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
Expand Down Expand Up @@ -177,12 +177,12 @@ PLUGINS := sinks/file \
sources/zmq \
functions/accumulateWordCount \
functions/countPlusOne \
functions/thumbnail\
functions/resize\
functions/image\
functions/geohash\
functions/echo

.PHONY: plugins sinks/tdengine $(PLUGINS)
plugins: cross_prepare sinks/tdengine $(PLUGINS)
plugins: cross_prepare sinks/tdengine functions/labelImage $(PLUGINS)
sinks/tdengine:
@docker buildx build --no-cache \
--platform=linux/amd64,linux/arm64 \
Expand All @@ -200,6 +200,21 @@ sinks/tdengine:
done
@rm -f /tmp/cross_build_plugins_sinks_tdengine.tar

functions/labelImage:
@docker buildx build --no-cache \
--platform=linux/amd64 \
-t cross_build \
--build-arg VERSION=$(VERSION) \
--build-arg PLUGIN_TYPE=functions \
--build-arg PLUGIN_NAME=labelImage \
--output type=tar,dest=/tmp/cross_build_plugins_functions_labelImage.tar \
-f .ci/Dockerfile-plugins .

@mkdir -p _plugins/debian/functions
@tar -xvf /tmp/cross_build_plugins_functions_labelImage.tar --wildcards "go/kuiper/plugins/functions/labelImage/labelImage_amd64.zip"
@mv $$(ls go/kuiper/plugins/functions/labelImage/labelImage_amd64.zip) _plugins/debian/functions
@rm -f /tmp/cross_build_plugins_functions_labelImage.tar

$(PLUGINS): PLUGIN_TYPE = $(word 1, $(subst /, , $@))
$(PLUGINS): PLUGIN_NAME = $(word 2, $(subst /, , $@))
$(PLUGINS):
Expand Down
19 changes: 19 additions & 0 deletions common/kv/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kv

type KeyValue interface {
Open() error
Close() error
// Set key to hold string value if key does not exist otherwise return an error
Setnx(key string, value interface{}) error
// Set key to hold the string value. If key already holds a value, it is overwritten
Set(key string, value interface{}) error
Get(key string, val interface{}) (bool, error)
//Must return *common.Error with NOT_FOUND error
Delete(key string) error
Keys() (keys []string, err error)
Clean() error
}

func GetDefaultKVStore(fpath string) (ret KeyValue) {
return GetSqliteKVStore(fpath)
}
25 changes: 6 additions & 19 deletions common/kv.go → common/kv/sqliteKV.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,29 @@
package common
package kv

import (
"bytes"
"database/sql"
"encoding/gob"
"fmt"
"github.com/emqx/kuiper/common"
_ "github.com/mattn/go-sqlite3"
"os"
"path"
"path/filepath"
"strings"
)

type KeyValue interface {
Open() error
Close() error
// Set key to hold string value if key does not exist otherwise return an error
Setnx(key string, value interface{}) error
// Set key to hold the string value. If key already holds a value, it is overwritten
Set(key string, value interface{}) error
Get(key string, val interface{}) (bool, error)
//Must return *common.Error with NOT_FOUND error
Delete(key string) error
Keys() (keys []string, err error)
Clean() error
}

type SqliteKVStore struct {
db *sql.DB
table string
path string
}

func GetSqliteKVStore(fpath string) (ret *SqliteKVStore) {
if _, err := os.Stat(fpath); os.IsNotExist(err) {
os.MkdirAll(fpath, os.ModePerm)
}
dir, file := filepath.Split(fpath)
if _, err := os.Stat(dir); os.IsNotExist(err) {
os.MkdirAll(dir, os.ModePerm)
}
ret = new(SqliteKVStore)
ret.path = path.Join(dir, "sqliteKV.db")
ret.table = file
Expand Down Expand Up @@ -120,7 +107,7 @@ func (m *SqliteKVStore) Delete(key string) error {
var tmp []byte
err := row.Scan(&tmp)
if nil != err || 0 == len(tmp) {
return NewErrorWithCode(NOT_FOUND, fmt.Sprintf("%s is not found", key))
return common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s is not found", key))
}
sql = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", m.table, key)
_, err = m.db.Exec(sql)
Expand Down
101 changes: 101 additions & 0 deletions common/kv/sqliteKV_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package kv

import (
"os"
"path"
"path/filepath"
"reflect"
"testing"
)

func TestSqliteKVStore_Funcs(t *testing.T) {
abs, _ := filepath.Abs("test")
if f, _ := os.Stat(abs); f != nil {
os.Remove(abs)
}

ks := GetSqliteKVStore(abs)
if e := ks.Open(); e != nil {
t.Errorf("Failed to open data %s.", e)
}

if err := ks.Setnx("foo", "bar"); nil != err {
t.Error(err)
}

var v string
if ok, _ := ks.Get("foo", &v); ok {
if !reflect.DeepEqual("bar", v) {
t.Error("expect:bar", "get:", v)
}
} else {
t.Errorf("Should not find the foo key.")
}

if err := ks.Setnx("foo1", "bar1"); nil != err {
t.Error(err)
}

if err := ks.Set("foo1", "bar2"); nil != err {
t.Error(err)
}

var v1 string
if ok, _ := ks.Get("foo1", &v1); ok {
if !reflect.DeepEqual("bar2", v1) {
t.Error("expect:bar2", "get:", v1)
}
} else {
t.Errorf("Should not find the foo1 key.")
}

if keys, e1 := ks.Keys(); e1 != nil {
t.Errorf("Failed to get value: %s.", e1)
} else {
if !reflect.DeepEqual(2, len(keys)) {
t.Error("expect:2", "get:", len(keys))
}
}

if e2 := ks.Close(); e2 != nil {
t.Errorf("Failed to close data: %s.", e2)
}

if err := ks.Open(); nil != err {
t.Error(err)
}

var v2 string
if ok, _ := ks.Get("foo", &v2); ok {
if !reflect.DeepEqual("bar", v2) {
t.Error("expect:bar", "get:", v)
}
} else {
t.Errorf("Should not find the foo key.")
}

if err := ks.Delete("foo1"); nil != err {
t.Error(err)
}

if keys, e1 := ks.Keys(); e1 != nil {
t.Errorf("Failed to get value: %s.", e1)
} else {
reflect.DeepEqual(1, len(keys))
}

if err := ks.Clean(); nil != err {
t.Error(err)
}

if keys, e1 := ks.Keys(); e1 != nil {
t.Errorf("Failed to get value: %s.", e1)
} else {
reflect.DeepEqual(0, len(keys))
}

dir, _ := filepath.Split(abs)
abs = path.Join(dir, "sqliteKV.db")
os.Remove(abs)

}
1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func InitConf() {
Concurrency: 1,
BufferLength: 1024,
CheckpointInterval: 300000, //5 minutes
SendError: true,
},
}
if err := yaml.Unmarshal(b, &kc); err != nil {
Expand Down
Loading

0 comments on commit af7e6cf

Please sign in to comment.