From de7e89134b654c76cde79eda6e5cf73c5445d48d Mon Sep 17 00:00:00 2001 From: ngjaying Date: Thu, 29 Oct 2020 11:44:37 +0800 Subject: [PATCH 01/15] fix(node): fix dynamic channel buffer error in arm --- xstream/nodes/dynamic_channel_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xstream/nodes/dynamic_channel_buffer.go b/xstream/nodes/dynamic_channel_buffer.go index 0ff01f1186..e228d0406b 100644 --- a/xstream/nodes/dynamic_channel_buffer.go +++ b/xstream/nodes/dynamic_channel_buffer.go @@ -7,10 +7,10 @@ import ( ) type DynamicChannelBuffer struct { + limit int64 In chan api.SourceTuple Out chan api.SourceTuple buffer []api.SourceTuple - limit int64 } func NewDynamicChannelBuffer() *DynamicChannelBuffer { From 628d5db10010a357ebf32c1dea83f8a5b11f11ff Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Fri, 30 Oct 2020 15:51:37 +0800 Subject: [PATCH 02/15] feat(rule):support for update streams --- xstream/server/server/rest.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/xstream/server/server/rest.go b/xstream/server/server/rest.go index 280c3ad6b0..17526de73d 100644 --- a/xstream/server/server/rest.go +++ b/xstream/server/server/rest.go @@ -76,7 +76,7 @@ func createRestServer(port int) *http.Server { r := mux.NewRouter() r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost) - r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete) + r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut) r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet) r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet) @@ -191,6 +191,20 @@ func streamHandler(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) + case http.MethodPut: + v, err := decodeStatementDescriptor(r.Body) + if err != nil { + handleError(w, err, "Invalid body", logger) + return + } + streamProcessor.DropStream(name) + content, err := streamProcessor.ExecStreamSql(v.Sql) + if err != nil { + handleError(w, err, "Stream command error", logger) + return + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(content)) } } From 799db6e79d3525bb0e542935f854a85be2fabca0 Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Fri, 30 Oct 2020 10:22:10 +0800 Subject: [PATCH 03/15] feat(rule):support for update rules feat():update->updated --- xsql/processors/xsql_processor.go | 21 +++++++++++++++++++++ xstream/server/server/rest.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/xsql/processors/xsql_processor.go b/xsql/processors/xsql_processor.go index ea9731f39f..da0481231b 100644 --- a/xsql/processors/xsql_processor.go +++ b/xsql/processors/xsql_processor.go @@ -228,6 +228,27 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) { return rule, nil } +func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) { + rule, err := p.getRuleByJson(name, ruleJson) + if err != nil { + return nil, err + } + + err = p.db.Open() + if err != nil { + return nil, err + } + defer p.db.Close() + + err = p.db.Replace(rule.Id, ruleJson) + if err != nil { + return nil, err + } else { + log.Infof("Rule %s is update.", rule.Id) + } + + return rule, nil +} func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) { rule, err := p.GetRuleByName(name) diff --git a/xstream/server/server/rest.go b/xstream/server/server/rest.go index 17526de73d..ddbf701ed3 100644 --- a/xstream/server/server/rest.go +++ b/xstream/server/server/rest.go @@ -78,7 +78,7 @@ func createRestServer(port int) *http.Server { r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut) r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost) - r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet) + r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut) r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet) r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost) r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost) @@ -272,6 +272,35 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) + case http.MethodPut: + _, err := ruleProcessor.GetRuleByName(name) + if err != nil { + handleError(w, err, "not found this rule", logger) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + handleError(w, err, "Invalid body", logger) + return + } + + r, err := ruleProcessor.ExecUpdate(name, string(body)) + var result string + if err != nil { + handleError(w, err, "Update rule error", logger) + return + } else { + result = fmt.Sprintf("Rule %s was updated successfully.", r.Id) + } + + err = restartRule(name) + if err != nil { + handleError(w, err, "restart rule error", logger) + return + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(result)) } } From 734558184ea937268bfb1346df43a2ba12549fad Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Fri, 30 Oct 2020 14:00:12 +0800 Subject: [PATCH 04/15] feat():Change to a smaller time unit for elapsed time (rule status) feat():del test --- xsql/processors/common_test.go | 2 +- xsql/processors/extension_test.go | 12 +- xsql/processors/rule_test.go | 66 +++++----- xsql/processors/window_rule_test.go | 182 ++++++++++++++-------------- xstream/nodes/prometheus.go | 6 +- xstream/nodes/stats_manager.go | 4 +- 6 files changed, 136 insertions(+), 136 deletions(-) diff --git a/xsql/processors/common_test.go b/xsql/processors/common_test.go index 1bc1da5dc5..3a3c2fdda4 100644 --- a/xsql/processors/common_test.go +++ b/xsql/processors/common_test.go @@ -71,7 +71,7 @@ func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err erro ) for index, key = range keys { if k == key { - if strings.HasSuffix(k, "process_latency_ms") { + if strings.HasSuffix(k, "process_latency_us") { if values[index].(int64) >= v.(int64) { matched = true continue diff --git a/xsql/processors/extension_test.go b/xsql/processors/extension_test.go index bd9afbe376..79484f1f0a 100644 --- a/xsql/processors/extension_test.go +++ b/xsql/processors/extension_test.go @@ -180,12 +180,12 @@ func TestFuncState(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_text_0_exceptions_total": int64(0), - "op_preprocessor_text_0_process_latency_ms": int64(0), + "op_preprocessor_text_0_process_latency_us": int64(0), "op_preprocessor_text_0_records_in_total": int64(8), "op_preprocessor_text_0_records_out_total": int64(8), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(8), "op_project_0_records_out_total": int64(8), @@ -245,12 +245,12 @@ func TestFuncStateCheckpoint(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_text_0_exceptions_total": int64(0), - "op_preprocessor_text_0_process_latency_ms": int64(0), + "op_preprocessor_text_0_process_latency_us": int64(0), "op_preprocessor_text_0_records_in_total": int64(6), "op_preprocessor_text_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(6), "op_project_0_records_out_total": int64(6), @@ -267,12 +267,12 @@ func TestFuncStateCheckpoint(t *testing.T) { cc: 1, pauseMetric: map[string]interface{}{ "op_preprocessor_text_0_exceptions_total": int64(0), - "op_preprocessor_text_0_process_latency_ms": int64(0), + "op_preprocessor_text_0_process_latency_us": int64(0), "op_preprocessor_text_0_records_in_total": int64(3), "op_preprocessor_text_0_records_out_total": int64(3), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(3), "op_project_0_records_out_total": int64(3), diff --git a/xsql/processors/rule_test.go b/xsql/processors/rule_test.go index 03204ddf3d..c46c43057b 100644 --- a/xsql/processors/rule_test.go +++ b/xsql/processors/rule_test.go @@ -45,12 +45,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -85,12 +85,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), @@ -103,7 +103,7 @@ func TestSingleSQL(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(2), }, @@ -122,12 +122,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), @@ -140,7 +140,7 @@ func TestSingleSQL(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(2), }, @@ -164,12 +164,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoError_0_exceptions_total": int64(3), - "op_preprocessor_demoError_0_process_latency_ms": int64(0), + "op_preprocessor_demoError_0_process_latency_us": int64(0), "op_preprocessor_demoError_0_records_in_total": int64(5), "op_preprocessor_demoError_0_records_out_total": int64(2), "op_project_0_exceptions_total": int64(3), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(1), @@ -182,7 +182,7 @@ func TestSingleSQL(t *testing.T) { "source_demoError_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(3), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(1), }, @@ -206,12 +206,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoError_0_exceptions_total": int64(3), - "op_preprocessor_demoError_0_process_latency_ms": int64(0), + "op_preprocessor_demoError_0_process_latency_us": int64(0), "op_preprocessor_demoError_0_records_in_total": int64(5), "op_preprocessor_demoError_0_records_out_total": int64(2), "op_project_0_exceptions_total": int64(3), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(1), @@ -224,7 +224,7 @@ func TestSingleSQL(t *testing.T) { "source_demoError_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(3), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(1), }, @@ -255,12 +255,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -287,12 +287,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), @@ -305,7 +305,7 @@ func TestSingleSQL(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(2), }, @@ -331,12 +331,12 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo1_0_exceptions_total": int64(0), - "op_preprocessor_demo1_0_process_latency_ms": int64(0), + "op_preprocessor_demo1_0_process_latency_us": int64(0), "op_preprocessor_demo1_0_records_in_total": int64(5), "op_preprocessor_demo1_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -367,17 +367,17 @@ func TestSingleSQL(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo1_0_exceptions_total": int64(0), - "op_preprocessor_demo1_0_process_latency_ms": int64(0), + "op_preprocessor_demo1_0_process_latency_us": int64(0), "op_preprocessor_demo1_0_records_in_total": int64(5), "op_preprocessor_demo1_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(2), @@ -433,12 +433,12 @@ func TestSingleSQLError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(3), "op_project_0_records_out_total": int64(2), @@ -451,7 +451,7 @@ func TestSingleSQLError(t *testing.T) { "source_ldemo_0_records_out_total": int64(5), "op_filter_0_exceptions_total": int64(1), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(5), "op_filter_0_records_out_total": int64(2), }, @@ -475,12 +475,12 @@ func TestSingleSQLError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(4), @@ -533,12 +533,12 @@ func TestSingleSQLTemplate(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -591,12 +591,12 @@ func TestNoneSingleSQLTemplate(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), diff --git a/xsql/processors/window_rule_test.go b/xsql/processors/window_rule_test.go index 6723a8e273..5df81d0a8e 100644 --- a/xsql/processors/window_rule_test.go +++ b/xsql/processors/window_rule_test.go @@ -58,12 +58,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(4), @@ -76,7 +76,7 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(4), }, @@ -98,12 +98,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), @@ -116,12 +116,12 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(4), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(4), "op_filter_0_records_out_total": int64(2), }, @@ -173,17 +173,17 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_preprocessor_demo1_0_exceptions_total": int64(0), - "op_preprocessor_demo1_0_process_latency_ms": int64(0), + "op_preprocessor_demo1_0_process_latency_us": int64(0), "op_preprocessor_demo1_0_records_in_total": int64(5), "op_preprocessor_demo1_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(8), "op_project_0_records_out_total": int64(8), @@ -200,12 +200,12 @@ func TestWindow(t *testing.T) { "source_demo1_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(10), "op_window_0_records_out_total": int64(10), "op_join_0_exceptions_total": int64(0), - "op_join_0_process_latency_ms": int64(0), + "op_join_0_process_latency_us": int64(0), "op_join_0_records_in_total": int64(10), "op_join_0_records_out_total": int64(8), }, @@ -249,12 +249,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -267,17 +267,17 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(5), "op_aggregate_0_exceptions_total": int64(0), - "op_aggregate_0_process_latency_ms": int64(0), + "op_aggregate_0_process_latency_us": int64(0), "op_aggregate_0_records_in_total": int64(5), "op_aggregate_0_records_out_total": int64(5), "op_order_0_exceptions_total": int64(0), - "op_order_0_process_latency_ms": int64(0), + "op_order_0_process_latency_us": int64(0), "op_order_0_records_in_total": int64(5), "op_order_0_records_out_total": int64(5), }, @@ -311,12 +311,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_sessionDemo_0_exceptions_total": int64(0), - "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0), + "op_preprocessor_sessionDemo_0_process_latency_us": int64(0), "op_preprocessor_sessionDemo_0_records_in_total": int64(11), "op_preprocessor_sessionDemo_0_records_out_total": int64(11), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(4), @@ -329,7 +329,7 @@ func TestWindow(t *testing.T) { "source_sessionDemo_0_records_out_total": int64(11), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(11), "op_window_0_records_out_total": int64(4), }, @@ -365,17 +365,17 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_preprocessor_demo1_0_exceptions_total": int64(0), - "op_preprocessor_demo1_0_process_latency_ms": int64(0), + "op_preprocessor_demo1_0_process_latency_us": int64(0), "op_preprocessor_demo1_0_records_in_total": int64(5), "op_preprocessor_demo1_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(8), "op_project_0_records_out_total": int64(8), @@ -392,12 +392,12 @@ func TestWindow(t *testing.T) { "source_demo1_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(10), "op_window_0_records_out_total": int64(10), "op_join_0_exceptions_total": int64(0), - "op_join_0_process_latency_ms": int64(0), + "op_join_0_process_latency_us": int64(0), "op_join_0_records_in_total": int64(10), "op_join_0_records_out_total": int64(8), }, @@ -436,12 +436,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoError_0_exceptions_total": int64(3), - "op_preprocessor_demoError_0_process_latency_ms": int64(0), + "op_preprocessor_demoError_0_process_latency_us": int64(0), "op_preprocessor_demoError_0_records_in_total": int64(5), "op_preprocessor_demoError_0_records_out_total": int64(2), "op_project_0_exceptions_total": int64(3), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(6), "op_project_0_records_out_total": int64(3), @@ -454,7 +454,7 @@ func TestWindow(t *testing.T) { "source_demoError_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(3), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(3), }, @@ -470,12 +470,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(1), "op_project_0_records_out_total": int64(1), @@ -488,22 +488,22 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(4), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(4), "op_filter_0_records_out_total": int64(2), "op_aggregate_0_exceptions_total": int64(0), - "op_aggregate_0_process_latency_ms": int64(0), + "op_aggregate_0_process_latency_us": int64(0), "op_aggregate_0_records_in_total": int64(2), "op_aggregate_0_records_out_total": int64(2), "op_having_0_exceptions_total": int64(0), - "op_having_0_process_latency_ms": int64(0), + "op_having_0_process_latency_us": int64(0), "op_having_0_records_in_total": int64(2), "op_having_0_records_out_total": int64(1), }, @@ -542,12 +542,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(4), @@ -560,7 +560,7 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(3), "op_window_0_records_out_total": int64(4), }, @@ -574,12 +574,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(1), "op_project_0_records_out_total": int64(1), @@ -592,7 +592,7 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(1), }, @@ -610,12 +610,12 @@ func TestWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demo_0_exceptions_total": int64(0), - "op_preprocessor_demo_0_process_latency_ms": int64(0), + "op_preprocessor_demo_0_process_latency_us": int64(0), "op_preprocessor_demo_0_records_in_total": int64(5), "op_preprocessor_demo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -628,7 +628,7 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(5), }, @@ -700,12 +700,12 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoE_0_exceptions_total": int64(0), - "op_preprocessor_demoE_0_process_latency_ms": int64(0), + "op_preprocessor_demoE_0_process_latency_us": int64(0), "op_preprocessor_demoE_0_records_in_total": int64(6), "op_preprocessor_demoE_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -718,7 +718,7 @@ func TestEventWindow(t *testing.T) { "source_demoE_0_records_out_total": int64(6), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(6), "op_window_0_records_out_total": int64(5), }, @@ -737,12 +737,12 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoE_0_exceptions_total": int64(0), - "op_preprocessor_demoE_0_process_latency_ms": int64(0), + "op_preprocessor_demoE_0_process_latency_us": int64(0), "op_preprocessor_demoE_0_records_in_total": int64(6), "op_preprocessor_demoE_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(2), @@ -755,12 +755,12 @@ func TestEventWindow(t *testing.T) { "source_demoE_0_records_out_total": int64(6), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(6), "op_window_0_records_out_total": int64(4), "op_filter_0_exceptions_total": int64(0), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(4), "op_filter_0_records_out_total": int64(2), }, @@ -800,17 +800,17 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoE_0_exceptions_total": int64(0), - "op_preprocessor_demoE_0_process_latency_ms": int64(0), + "op_preprocessor_demoE_0_process_latency_us": int64(0), "op_preprocessor_demoE_0_records_in_total": int64(6), "op_preprocessor_demoE_0_records_out_total": int64(6), "op_preprocessor_demo1E_0_exceptions_total": int64(0), - "op_preprocessor_demo1E_0_process_latency_ms": int64(0), + "op_preprocessor_demo1E_0_process_latency_us": int64(0), "op_preprocessor_demo1E_0_records_in_total": int64(6), "op_preprocessor_demo1E_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -827,12 +827,12 @@ func TestEventWindow(t *testing.T) { "source_demo1E_0_records_out_total": int64(6), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(12), "op_window_0_records_out_total": int64(5), "op_join_0_exceptions_total": int64(0), - "op_join_0_process_latency_ms": int64(0), + "op_join_0_process_latency_us": int64(0), "op_join_0_records_in_total": int64(5), "op_join_0_records_out_total": int64(5), }, @@ -860,12 +860,12 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoE_0_exceptions_total": int64(0), - "op_preprocessor_demoE_0_process_latency_ms": int64(0), + "op_preprocessor_demoE_0_process_latency_us": int64(0), "op_preprocessor_demoE_0_records_in_total": int64(6), "op_preprocessor_demoE_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(4), @@ -878,17 +878,17 @@ func TestEventWindow(t *testing.T) { "source_demoE_0_records_out_total": int64(6), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(6), "op_window_0_records_out_total": int64(4), "op_aggregate_0_exceptions_total": int64(0), - "op_aggregate_0_process_latency_ms": int64(0), + "op_aggregate_0_process_latency_us": int64(0), "op_aggregate_0_records_in_total": int64(4), "op_aggregate_0_records_out_total": int64(4), "op_order_0_exceptions_total": int64(0), - "op_order_0_process_latency_ms": int64(0), + "op_order_0_process_latency_us": int64(0), "op_order_0_records_in_total": int64(4), "op_order_0_records_out_total": int64(4), }, @@ -920,12 +920,12 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_sessionDemoE_0_exceptions_total": int64(0), - "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0), + "op_preprocessor_sessionDemoE_0_process_latency_us": int64(0), "op_preprocessor_sessionDemoE_0_records_in_total": int64(12), "op_preprocessor_sessionDemoE_0_records_out_total": int64(12), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(4), @@ -938,7 +938,7 @@ func TestEventWindow(t *testing.T) { "source_sessionDemoE_0_records_out_total": int64(12), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(12), "op_window_0_records_out_total": int64(4), }, @@ -965,17 +965,17 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoE_0_exceptions_total": int64(0), - "op_preprocessor_demoE_0_process_latency_ms": int64(0), + "op_preprocessor_demoE_0_process_latency_us": int64(0), "op_preprocessor_demoE_0_records_in_total": int64(6), "op_preprocessor_demoE_0_records_out_total": int64(6), "op_preprocessor_demo1E_0_exceptions_total": int64(0), - "op_preprocessor_demo1E_0_process_latency_ms": int64(0), + "op_preprocessor_demo1E_0_process_latency_us": int64(0), "op_preprocessor_demo1E_0_records_in_total": int64(6), "op_preprocessor_demo1E_0_records_out_total": int64(6), "op_project_0_exceptions_total": int64(0), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(5), @@ -996,7 +996,7 @@ func TestEventWindow(t *testing.T) { "op_window_0_records_out_total": int64(5), "op_join_0_exceptions_total": int64(0), - "op_join_0_process_latency_ms": int64(0), + "op_join_0_process_latency_us": int64(0), "op_join_0_records_in_total": int64(5), "op_join_0_records_out_total": int64(5), }, @@ -1037,12 +1037,12 @@ func TestEventWindow(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_demoErr_0_exceptions_total": int64(1), - "op_preprocessor_demoErr_0_process_latency_ms": int64(0), + "op_preprocessor_demoErr_0_process_latency_us": int64(0), "op_preprocessor_demoErr_0_records_in_total": int64(6), "op_preprocessor_demoErr_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(6), "op_project_0_records_out_total": int64(5), @@ -1055,7 +1055,7 @@ func TestEventWindow(t *testing.T) { "source_demoErr_0_records_out_total": int64(6), "op_window_0_exceptions_total": int64(1), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(6), "op_window_0_records_out_total": int64(5), }, @@ -1103,12 +1103,12 @@ func TestWindowError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(1), @@ -1121,7 +1121,7 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(2), }, @@ -1137,12 +1137,12 @@ func TestWindowError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(2), "op_project_0_records_out_total": int64(1), @@ -1155,12 +1155,12 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(4), "op_filter_0_exceptions_total": int64(1), - "op_filter_0_process_latency_ms": int64(0), + "op_filter_0_process_latency_us": int64(0), "op_filter_0_records_in_total": int64(4), "op_filter_0_records_out_total": int64(1), }, @@ -1196,17 +1196,17 @@ func TestWindowError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_preprocessor_ldemo1_0_exceptions_total": int64(0), - "op_preprocessor_ldemo1_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo1_0_process_latency_us": int64(0), "op_preprocessor_ldemo1_0_records_in_total": int64(5), "op_preprocessor_ldemo1_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(3), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(8), "op_project_0_records_out_total": int64(5), @@ -1223,12 +1223,12 @@ func TestWindowError(t *testing.T) { "source_ldemo1_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(10), "op_window_0_records_out_total": int64(10), "op_join_0_exceptions_total": int64(3), - "op_join_0_process_latency_ms": int64(0), + "op_join_0_process_latency_us": int64(0), "op_join_0_records_in_total": int64(10), "op_join_0_records_out_total": int64(5), }, @@ -1250,12 +1250,12 @@ func TestWindowError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(3), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(5), "op_project_0_records_out_total": int64(2), @@ -1268,17 +1268,17 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(5), "op_aggregate_0_exceptions_total": int64(0), - "op_aggregate_0_process_latency_ms": int64(0), + "op_aggregate_0_process_latency_us": int64(0), "op_aggregate_0_records_in_total": int64(5), "op_aggregate_0_records_out_total": int64(5), "op_having_0_exceptions_total": int64(3), - "op_having_0_process_latency_ms": int64(0), + "op_having_0_process_latency_us": int64(0), "op_having_0_records_in_total": int64(5), "op_having_0_records_out_total": int64(2), }, @@ -1299,12 +1299,12 @@ func TestWindowError(t *testing.T) { }, m: map[string]interface{}{ "op_preprocessor_ldemo_0_exceptions_total": int64(0), - "op_preprocessor_ldemo_0_process_latency_ms": int64(0), + "op_preprocessor_ldemo_0_process_latency_us": int64(0), "op_preprocessor_ldemo_0_records_in_total": int64(5), "op_preprocessor_ldemo_0_records_out_total": int64(5), "op_project_0_exceptions_total": int64(1), - "op_project_0_process_latency_ms": int64(0), + "op_project_0_process_latency_us": int64(0), "op_project_0_records_in_total": int64(4), "op_project_0_records_out_total": int64(3), @@ -1317,12 +1317,12 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_out_total": int64(5), "op_window_0_exceptions_total": int64(0), - "op_window_0_process_latency_ms": int64(0), + "op_window_0_process_latency_us": int64(0), "op_window_0_records_in_total": int64(5), "op_window_0_records_out_total": int64(4), "op_order_0_exceptions_total": int64(1), - "op_order_0_process_latency_ms": int64(0), + "op_order_0_process_latency_us": int64(0), "op_order_0_records_in_total": int64(4), "op_order_0_records_out_total": int64(3), }, diff --git a/xstream/nodes/prometheus.go b/xstream/nodes/prometheus.go index 39c0477323..ded5969cc7 100644 --- a/xstream/nodes/prometheus.go +++ b/xstream/nodes/prometheus.go @@ -8,12 +8,12 @@ import ( const RecordsInTotal = "records_in_total" const RecordsOutTotal = "records_out_total" const ExceptionsTotal = "exceptions_total" -const ProcessLatencyMs = "process_latency_ms" +const ProcessLatencyUs = "process_latency_us" const LastInvocation = "last_invocation" const BufferLength = "buffer_length" var ( - MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyMs, BufferLength, LastInvocation} + MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation} prometheuseMetrics *PrometheusMetrics mutex sync.RWMutex ) @@ -60,7 +60,7 @@ func newPrometheusMetrics() *PrometheusMetrics { Help: "Total number of user exceptions of " + prefix, }, labelNames) processLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: prefix + "_" + ProcessLatencyMs, + Name: prefix + "_" + ProcessLatencyUs, Help: "Process latency in millisecond of " + prefix, }, labelNames) bufferLength := prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/xstream/nodes/stats_manager.go b/xstream/nodes/stats_manager.go index d59ca9e9bc..62f78c46b5 100644 --- a/xstream/nodes/stats_manager.go +++ b/xstream/nodes/stats_manager.go @@ -111,7 +111,7 @@ func (sm *DefaultStatManager) ProcessTimeStart() { func (sm *DefaultStatManager) ProcessTimeEnd() { if !sm.processTimeStart.IsZero() { - sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond) + sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond) } } @@ -138,7 +138,7 @@ func (sm *PrometheusStatManager) IncTotalExceptions() { func (sm *PrometheusStatManager) ProcessTimeEnd() { if !sm.processTimeStart.IsZero() { - sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond) + sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond) sm.pProcessLatency.Set(float64(sm.processLatency)) } } From 0f8305bb7bd8d0eb1bdf55b83df0b71af69dc1c5 Mon Sep 17 00:00:00 2001 From: ngjaying Date: Mon, 2 Nov 2020 11:54:19 +0800 Subject: [PATCH 05/15] refactor(rest): return complex type as object instead of string --- xsql/ast.go | 4 ++-- xsql/ast_test.go | 40 +++++++++++++++++++++++++++++++++ xsql/util.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++- xsql/util_test.go | 46 +++++++++++++++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 4 deletions(-) diff --git a/xsql/ast.go b/xsql/ast.go index 5ee815968e..a760d73bc0 100644 --- a/xsql/ast.go +++ b/xsql/ast.go @@ -314,10 +314,10 @@ type StreamField struct { func (u *StreamField) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { - FieldType string + FieldType interface{} Name string }{ - FieldType: PrintFieldType(u.FieldType), + FieldType: PrintFieldTypeForJson(u.FieldType), Name: u.Name, }) } diff --git a/xsql/ast_test.go b/xsql/ast_test.go index 220e168047..2897741775 100644 --- a/xsql/ast_test.go +++ b/xsql/ast_test.go @@ -1,6 +1,7 @@ package xsql import ( + "encoding/json" "fmt" "reflect" "testing" @@ -108,3 +109,42 @@ func Test_MessageValTest(t *testing.T) { } } } + +func Test_StreamFieldsMarshall(t *testing.T) { + var tests = []struct { + sf StreamFields + r string + }{{ + sf: []StreamField{ + {Name: "USERID", FieldType: &BasicType{Type: BIGINT}}, + {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}}, + {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}}, + {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}}, + {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}}, + {Name: "ADDRESS", FieldType: &RecType{ + StreamFields: []StreamField{ + {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, + {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, + }, + }}, + }, + r: `[{"FieldType":"bigint","Name":"USERID"},{"FieldType":"string","Name":"FIRST_NAME"},{"FieldType":"string","Name":"LAST_NAME"},{"FieldType":{"Type":"array","ElementType":"string"},"Name":"NICKNAMES"},{"FieldType":"boolean","Name":"Gender"},{"FieldType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]},"Name":"ADDRESS"}]`, + }, { + sf: []StreamField{ + {Name: "USERID", FieldType: &BasicType{Type: BIGINT}}, + }, + r: `[{"FieldType":"bigint","Name":"USERID"}]`, + }} + fmt.Printf("The test bucket size is %d.\n\n", len(tests)) + for i, tt := range tests { + r, err := json.Marshal(tt.sf) + if err != nil { + t.Errorf("%d. \nmarshall error: %v", i, err) + t.FailNow() + } + result := string(r) + if !reflect.DeepEqual(tt.r, result) { + t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.r, result) + } + } +} diff --git a/xsql/util.go b/xsql/util.go index c37ef52d81..ffbc483c9e 100644 --- a/xsql/util.go +++ b/xsql/util.go @@ -1,6 +1,10 @@ package xsql -import "strings" +import ( + "encoding/json" + "fmt" + "strings" +) func PrintFieldType(ft FieldType) (result string) { switch t := ft.(type) { @@ -30,6 +34,56 @@ func PrintFieldType(ft FieldType) (result string) { return } +func PrintFieldTypeForJson(ft FieldType) (result interface{}) { + r, q := doPrintFieldTypeForJson(ft) + if q { + return r + } else { + return json.RawMessage(r) + } +} + +func doPrintFieldTypeForJson(ft FieldType) (result string, isLiteral bool) { + switch t := ft.(type) { + case *BasicType: + return t.Type.String(), true + case *ArrayType: + var ( + fieldType string + q bool + ) + if t.FieldType != nil { + fieldType, q = doPrintFieldTypeForJson(t.FieldType) + } else { + fieldType, q = t.Type.String(), true + } + if q { + result = fmt.Sprintf(`{"Type":"array","ElementType":"%s"}`, fieldType) + } else { + result = fmt.Sprintf(`{"Type":"array","ElementType":%s}`, fieldType) + } + + case *RecType: + result = `{"Type":"struct","Fields":[` + isFirst := true + for _, f := range t.StreamFields { + if isFirst { + isFirst = false + } else { + result += "," + } + fieldType, q := doPrintFieldTypeForJson(f.FieldType) + if q { + result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name) + } else { + result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name) + } + } + result += `]}` + } + return result, false +} + func GetStreams(stmt *SelectStatement) (result []string) { if stmt == nil { return nil diff --git a/xsql/util_test.go b/xsql/util_test.go index 4fcf487f46..77323d09de 100644 --- a/xsql/util_test.go +++ b/xsql/util_test.go @@ -61,10 +61,54 @@ func TestLowercaseKeyMap(t *testing.T) { fmt.Printf("The test bucket size is %d.\n\n", len(tests)) for i, tt := range tests { - //fmt.Printf("Parsing SQL %q.\n", tt.s) result := LowercaseKeyMap(tt.src) if !reflect.DeepEqual(tt.dest, result) { t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.dest, result) } } } + +func TestPrintFieldType(t *testing.T) { + var tests = []struct { + ft FieldType + printed string + }{{ + ft: &RecType{ + StreamFields: []StreamField{ + {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, + {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, + }, + }, + printed: `{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}`, + }, { + ft: &ArrayType{ + Type: STRUCT, + FieldType: &RecType{ + StreamFields: []StreamField{ + {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, + {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, + }, + }, + }, + printed: `{"Type":"array","ElementType":{"Type":"struct","Fields":[{"FieldType":"string","Name":"STREET_NAME"},{"FieldType":"bigint","Name":"NUMBER"}]}}`, + }, { + ft: &ArrayType{ + Type: STRUCT, + FieldType: &BasicType{Type: STRINGS}, + }, + printed: `{"Type":"array","ElementType":"string"}`, + }, { + ft: &BasicType{ + Type: STRINGS, + }, + printed: `string`, + }} + fmt.Printf("The test bucket size is %d.\n\n", len(tests)) + for i, tt := range tests { + //fmt.Printf("Parsing SQL %q.\n",tt.s) + result, _ := doPrintFieldTypeForJson(tt.ft) + if !reflect.DeepEqual(tt.printed, result) { + t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.printed, result) + } + } +} From 8a49aa2e0034ef13fa407918bb4a979f139da102 Mon Sep 17 00:00:00 2001 From: EMQmyd <66768232+EMQmyd@users.noreply.github.com> Date: Fri, 6 Nov 2020 14:32:56 +0800 Subject: [PATCH 06/15] feat(log):Adaptation log: syslog (#586) --- common/util.go | 39 +++++++++++----------- docs/en_US/operation/configuration_file.md | 3 +- docs/zh_CN/operation/configuration_file.md | 2 ++ 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/common/util.go b/common/util.go index 9c07c40b01..1ba9f3f5c1 100644 --- a/common/util.go +++ b/common/util.go @@ -15,19 +15,22 @@ import ( "path" "path/filepath" //"runtime" + logrus_syslog "github.com/sirupsen/logrus/hooks/syslog" + "log/syslog" "sort" "strings" "sync" ) const ( - logFileName = "stream.log" - etc_dir = "/etc/" - data_dir = "/data/" - log_dir = "/log/" - StreamConf = "kuiper.yaml" - KuiperBaseKey = "KuiperBaseKey" - MetaKey = "__meta" + logFileName = "stream.log" + etc_dir = "/etc/" + data_dir = "/data/" + log_dir = "/log/" + StreamConf = "kuiper.yaml" + KuiperBaseKey = "KuiperBaseKey" + KuiperSyslogKey = "KuiperSyslogKey" + MetaKey = "__meta" ) var ( @@ -79,21 +82,19 @@ type KuiperConf struct { } func init() { + Log = logrus.New() + if "true" == os.Getenv(KuiperSyslogKey) { + if hook, err := logrus_syslog.NewSyslogHook("", "", syslog.LOG_INFO, ""); err != nil { + Log.Error("Unable to connect to local syslog daemon") + } else { + Log.AddHook(hook) + } + } + filenameHook := filename.NewHook() filenameHook.Field = "file" - Log = logrus.New() Log.AddHook(filenameHook) - /* - Log.SetReportCaller(true) - Log.SetFormatter(&logrus.TextFormatter{ - CallerPrettyfier: func(f *runtime.Frame) (string, string) { - filename := path.Base(f.File) - return "", fmt.Sprintf("%s:%d", filename, f.Line) - }, - DisableColors: true, - FullTimestamp: true, - }) - */ + Log.SetFormatter(&logrus.TextFormatter{ TimestampFormat: "2006-01-02 15:04:05", DisableColors: true, diff --git a/docs/en_US/operation/configuration_file.md b/docs/en_US/operation/configuration_file.md index 1d0789992b..9d0487304c 100644 --- a/docs/en_US/operation/configuration_file.md +++ b/docs/en_US/operation/configuration_file.md @@ -12,7 +12,8 @@ basic: # true|false, if it's set to true, then the log will be print to log file fileLog: true ``` - +## system log +When the user sets the value of the environment variable named KuiperSyslogKey to true, the log will be printed to the syslog. ## Cli Port ```yaml basic: diff --git a/docs/zh_CN/operation/configuration_file.md b/docs/zh_CN/operation/configuration_file.md index 012f39f70c..6b5b87689e 100755 --- a/docs/zh_CN/operation/configuration_file.md +++ b/docs/zh_CN/operation/configuration_file.md @@ -12,6 +12,8 @@ basic: # true|false, if it's set to true, then the log will be print to log file fileLog: true ``` +## 系统日志 +用户将名为 KuiperSyslogKey 的环境变量的值设置为 true 时,日志将打印到系统日志中。 ## Cli 端口 ```yaml basic: From 83526f28d2e8cfd6a570703704a2153e4dcb8d24 Mon Sep 17 00:00:00 2001 From: EMQmyd <66768232+EMQmyd@users.noreply.github.com> Date: Sat, 7 Nov 2020 09:47:17 +0800 Subject: [PATCH 07/15] bug(script):Unzip the file and rename (#589) Co-authored-by: EMqmyd --- plugins/sinks/tdengine/install.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/sinks/tdengine/install.sh b/plugins/sinks/tdengine/install.sh index 5185cda2c3..68a5776939 100644 --- a/plugins/sinks/tdengine/install.sh +++ b/plugins/sinks/tdengine/install.sh @@ -18,7 +18,8 @@ then fi dir="TDengine-client" -tar -zxvf "$zip" +mkdir "$dir" +tar -xzvf "$zip" -C ./"$dir" --strip-components 1 rm "$zip" if ! [ -e $dir ] From ee2f718b9a1adb3b39c82c55eab918ec6dddf4da Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Mon, 9 Nov 2020 13:34:16 +0800 Subject: [PATCH 08/15] bug:reset API Header basic auth #573 --- etc/sinks/rest.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/sinks/rest.json b/etc/sinks/rest.json index b20a2c6fcd..d82188f472 100644 --- a/etc/sinks/rest.json +++ b/etc/sinks/rest.json @@ -97,8 +97,8 @@ "name": "headers", "default": "", "optional": true, - "control": "text", - "type": "string", + "control": "list", + "type": "list_object", "hint": { "en_US": "The additional headers to be set for the HTTP request.", "zh_CN": "要为 HTTP 请求设置的其他标头" From 95fd985599352b3b6af0055592a9e5789f01a64c Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Mon, 9 Nov 2020 13:34:16 +0800 Subject: [PATCH 09/15] bug:reset API Header basic auth #573 --- etc/sinks/rest.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/sinks/rest.json b/etc/sinks/rest.json index b20a2c6fcd..688e64fb7a 100644 --- a/etc/sinks/rest.json +++ b/etc/sinks/rest.json @@ -95,10 +95,10 @@ }, { "name": "headers", - "default": "", + "default": [], "optional": true, - "control": "text", - "type": "string", + "control": "list", + "type": "list_object", "hint": { "en_US": "The additional headers to be set for the HTTP request.", "zh_CN": "要为 HTTP 请求设置的其他标头" From 990718ac03a687e8c98ef142c789ea06006e61d8 Mon Sep 17 00:00:00 2001 From: Rory Z Date: Thu, 12 Nov 2020 09:15:25 +0800 Subject: [PATCH 10/15] chore(CI): change jmeter download url in actions (#595) --- .github/workflows/run_fvt_tests.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index 0b718a3815..13b2a33079 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -23,9 +23,9 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.2.1 + JMETER_VERSION: 5.3 run: | - wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz --no-check-certificate + wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -84,9 +84,9 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.2.1 + JMETER_VERSION: 5.3 run: | - wget --no-check-certificate -O /tmp/apache-jmeter.tgz http://us.mirrors.quenda.co/apache//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz + wget --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org//jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.response_data.on_error=true" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties From d2ad53a2326dd7e53bc3d23e1fee63a2d47f98ba Mon Sep 17 00:00:00 2001 From: EMqmyd Date: Tue, 10 Nov 2020 14:54:28 +0800 Subject: [PATCH 11/15] feat(stream):Stream update failure will be deleted --- xsql/processors/xsql_processor.go | 27 +++++++++++++++++++++++++++ xstream/server/server/rest.go | 3 +-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/xsql/processors/xsql_processor.go b/xsql/processors/xsql_processor.go index da0481231b..02caa195f0 100644 --- a/xsql/processors/xsql_processor.go +++ b/xsql/processors/xsql_processor.go @@ -77,6 +77,33 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, statement stri } } +func (p *StreamProcessor) ExecReplaceStream(statement string) (string, error) { + parser := xsql.NewParser(strings.NewReader(statement)) + stmt, err := xsql.Language.Parse(parser) + if err != nil { + return "", err + } + + switch s := stmt.(type) { + case *xsql.StreamStmt: + if err = p.db.Open(); nil != err { + return "", fmt.Errorf("Replace stream fails, error when opening db: %v.", err) + } + defer p.db.Close() + + if err = p.db.Replace(string(s.Name), statement); nil != err { + return "", fmt.Errorf("Replace stream fails: %v.", err) + } else { + info := fmt.Sprintf("Stream %s is replaced.", s.Name) + log.Printf("%s", info) + return info, nil + } + default: + return "", fmt.Errorf("Invalid stream statement: %s", statement) + } + return "", nil +} + func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) { r, err := p.ExecStmt(statement) if err != nil { diff --git a/xstream/server/server/rest.go b/xstream/server/server/rest.go index ddbf701ed3..beb106e11e 100644 --- a/xstream/server/server/rest.go +++ b/xstream/server/server/rest.go @@ -197,8 +197,7 @@ func streamHandler(w http.ResponseWriter, r *http.Request) { handleError(w, err, "Invalid body", logger) return } - streamProcessor.DropStream(name) - content, err := streamProcessor.ExecStreamSql(v.Sql) + content, err := streamProcessor.ExecReplaceStream(v.Sql) if err != nil { handleError(w, err, "Stream command error", logger) return From 7a50a101a5ead5551b7457060e1f79d3898aef1a Mon Sep 17 00:00:00 2001 From: RockyJin Date: Thu, 12 Nov 2020 21:17:36 +0800 Subject: [PATCH 12/15] docs: update edgex tutorial --- docs/en_US/edgex/edgex_rule_engine_tutorial.md | 4 ++++ docs/zh_CN/edgex/edgex_rule_engine_tutorial.md | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/docs/en_US/edgex/edgex_rule_engine_tutorial.md b/docs/en_US/edgex/edgex_rule_engine_tutorial.md index 331ab0187f..c73825b9ea 100644 --- a/docs/en_US/edgex/edgex_rule_engine_tutorial.md +++ b/docs/en_US/edgex/edgex_rule_engine_tutorial.md @@ -71,6 +71,10 @@ f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master ed7ad5ae08b2 nexus3.edgexfoundry.org:10004/docker-edgex-volume:master "/bin/sh -c '/usr/bi…" 37 minutes ago Up 37 minutes edgex-files ``` +#### Run with native + +For performance reason, reader probably wants to run Kuiper with native approach. But you may find that [EdgeX cannot be used](https://github.com/emqx/kuiper/issues/596) with the downloaded Kuiper binary packages. It's because that EdgeX message bus relies on `zeromq` library. If `zeromq` library cannot be found in the library search path, it cannot be started. So it will have those Kuiper users who do not want to use EdgeX install the `zeromq` library as well. For this reason, the default downloaded Kuiper package **does NOT have embedded support** for `EdgeX`. If reader wants to support `EdgeX` in native packages, you can either make a native package by running command `make pkg_with_edgex`, or just copy the binary package from docker container. + ### Create a stream There are two approaches to manage stream, you can use your preferred approach. diff --git a/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md b/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md index ab7a32be69..8686e492e7 100644 --- a/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md +++ b/docs/zh_CN/edgex/edgex_rule_engine_tutorial.md @@ -67,6 +67,10 @@ f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master ed7ad5ae08b2 nexus3.edgexfoundry.org:10004/docker-edgex-volume:master "/bin/sh -c '/usr/bi…" 37 minutes ago Up 37 minutes edgex-files ``` +#### 原生 (native) 方式运行 + +出于运行效率考虑,读者可能需要直接以原生方式运行 Kuiper,但是可能会发现直接使用下载的 Kuiper 软件包启动后[无法直接使用 Edgex](https://github.com/emqx/kuiper/issues/596),这是因为 EdgeX 缺省消息总线依赖于 `zeromq` 库,如果 Kuiper 启动的时候在库文件寻找路径下无法找到 `zeromq` 库,它将无法启动。这导致对于不需要使用 EdgeX 的 Kuiper 用户也不得不去安装 `zeromq` 库 ,因此缺省提供的下载安装包中**内置不支持 Edgex** 。如果读者需要以原生方式运行 Kuiper 并且支持 `EdgeX`,可以通过命令 `make pkg_with_edgex` 自己来编译原生安装包,或者从容器中直接拷贝出安装包。 + ### 创建流 该步骤是创建一个可以从 EdgeX 消息总线进行数据消费的流。有两种方法来支持管理流,你可以选择喜欢的方式。 From 3b6c917df819d38353f3faab529c868a7f51141f Mon Sep 17 00:00:00 2001 From: Rory Z Date: Fri, 13 Nov 2020 13:31:14 +0800 Subject: [PATCH 13/15] build(CI): add tdengine plugin for arm64 on github actions (#588) --- .ci/Dockerfile-plugins | 13 ++++++++----- Makefile | 9 ++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/.ci/Dockerfile-plugins b/.ci/Dockerfile-plugins index f18c9cfe09..46f9e62bd1 100644 --- a/.ci/Dockerfile-plugins +++ b/.ci/Dockerfile-plugins @@ -19,11 +19,14 @@ RUN set -e -u -x \ ;; \ tdengine ) \ if [ "$(uname -m)" = "x86_64" ]; then \ - wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.3.1-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \ - && tar -zxvf /tmp/TDengine-client-2.0.3.1-Linux-x64.tar.gz \ - && cd TDengine-client && ./install_client.sh && cd - \ - && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go; \ - fi \ + wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-x64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \ + fi; \ + if [ "$(uname -m)" = "aarch64" ]; then \ + wget "https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-2.0.6.0-Linux-aarch64.tar.gz" -O /tmp/TDengine-client-2.0.6.0.tar.gz; \ + fi; \ + tar -zxvf /tmp/TDengine-client-2.0.6.0.tar.gz \ + && cd TDengine-client-2.0.6.0 && ./install_client.sh && cd - \ + && go build --buildmode=plugin -tags plugins -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \ ;; \ * ) \ go build --buildmode=plugin -o plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so plugins/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME.go \ diff --git a/Makefile b/Makefile index 6dfa17938c..c9edbce88a 100644 --- a/Makefile +++ b/Makefile @@ -182,7 +182,7 @@ PLUGINS := sinks/file \ plugins: cross_prepare sinks/tdengine $(PLUGINS) sinks/tdengine: @docker buildx build --no-cache \ - --platform=linux/amd64 \ + --platform=linux/amd64,linux/arm64 \ -t cross_build \ --build-arg VERSION=$(VERSION) \ --build-arg PLUGIN_TYPE=sinks \ @@ -191,9 +191,12 @@ sinks/tdengine: -f .ci/Dockerfile-plugins . @mkdir -p _plugins/debian/sinks - @tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip" \ - && mv go/kuiper/plugins/sinks/tdengine/tdengine_amd64.zip _plugins/debian/sinks + @for arch in amd64 arm64; do \ + tar -xvf /tmp/cross_build_plugins_sinks_tdengine.tar --wildcards "linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip" \ + && mv $$(ls linux_$${arch}/go/kuiper/plugins/sinks/tdengine/tdengine_$$(echo $${arch%%_*}).zip) _plugins/debian/sinks; \ + done @rm -f /tmp/cross_build_plugins_sinks_tdengine.tar + $(PLUGINS): PLUGIN_TYPE = $(word 1, $(subst /, , $@)) $(PLUGINS): PLUGIN_NAME = $(word 2, $(subst /, , $@)) $(PLUGINS): From 7c939140a79060790a9d08f1184b4266b022fb15 Mon Sep 17 00:00:00 2001 From: EMQmyd <66768232+EMQmyd@users.noreply.github.com> Date: Fri, 13 Nov 2020 17:47:09 +0800 Subject: [PATCH 14/15] bug(tdengine):modify install.sh of tdengine (#602) Co-authored-by: EMqmyd --- plugins/sinks/tdengine/install.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/sinks/tdengine/install.sh b/plugins/sinks/tdengine/install.sh index 68a5776939..2c0f52c7fd 100644 --- a/plugins/sinks/tdengine/install.sh +++ b/plugins/sinks/tdengine/install.sh @@ -8,6 +8,9 @@ then fi url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-x64.tar.gz" +if [ "$(uname -m)" = "aarch64" ]; then \ + url="https://www.taosdata.com/download/download-all.php?pkgType=tdengine_linux&pkgName=TDengine-client-$1-Linux-aarch64.tar.gz" +fi zip="TDengine-client.tar.gz" wget -T 280 -O "$zip" "$url" From 678a3f1d42ec3c01b406dd23be07034f8d9b8a0a Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Tue, 10 Nov 2020 17:04:32 +0800 Subject: [PATCH 15/15] build(CI): update emqx.io release api in actions --- .github/workflows/build_packages.yaml | 19 ++++-- .../script/upload_github_release_asset.sh | 64 ------------------- 2 files changed, 12 insertions(+), 71 deletions(-) delete mode 100755 .github/workflows/script/upload_github_release_asset.sh diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index fe40bd95ee..90c6d8fbe8 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -262,13 +262,12 @@ jobs: cd _packages && for var in $( ls |grep -v sha256); do echo "$(cat $var.sha256) $var" | sha256sum -c || exit 1 done - - name: update github release + - uses: zhanghongtong/upload-release-asset@v1 if: github.event_name == 'release' - run: | - version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g") - for var in $(ls _packages) ; do - .github/workflows/script/upload_github_release_asset.sh owner=emqx repo=kuiper tag=$version filename=_packages/$var github_api_token=$(echo ${{ secrets.AccessToken }}) - done + with: + repo: kuiper + path: "_packages/kuiper-*" + token: ${{ secrets.AccessToken }} - name: create invalidation for cloudfront if: github.event_name == 'release' run: | @@ -287,7 +286,13 @@ jobs: if: github.event_name == 'release' run: | version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g") - curl -w %{http_code} --insecure -H ${{ secrets.EmqxHeader }} https://admin.emqx.io/admin_api/v1/kuiper_github_release_callback?tag=$version + curl -w %{http_code} \ + --insecure \ + -H "Content-Type: application/json" \ + -H "token: ${{ secrets.EMQX_IO_TOKEN }}" \ + -X POST \ + -d "{\"repo\":\"emqx/kuiper\", \"tag\": \"${version}\" }" \ + ${{ secrets.EMQX_IO_RELEASE_API }} - name: update helm packages if: github.event_name == 'release' run: | diff --git a/.github/workflows/script/upload_github_release_asset.sh b/.github/workflows/script/upload_github_release_asset.sh deleted file mode 100755 index 4e92fdb583..0000000000 --- a/.github/workflows/script/upload_github_release_asset.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env bash -# -# Author: Stefan Buck -# License: MIT -# https://gist.github.com/stefanbuck/ce788fee19ab6eb0b4447a85fc99f447 -# -# -# This script accepts the following parameters: -# -# * owner -# * repo -# * tag -# * filename -# * github_api_token -# -# Script to upload a release asset using the GitHub API v3. -# -# Example: -# -# upload_github_release_asset.sh github_api_token=TOKEN owner=stefanbuck repo=playground tag=v0.1.0 filename=./build.zip -# - -# Check dependencies. -set -e -xargs=$(which gxargs || which xargs) - -# Validate settings. -[ "$TRACE" ] && set -x - -CONFIG=$@ - -for line in $CONFIG; do - eval "$line" -done - -# Define variables. -GH_API="https://api.github.com" -GH_REPO="$GH_API/repos/$owner/$repo" -GH_TAGS="$GH_REPO/releases/tags/$tag" -AUTH="Authorization: token $github_api_token" -WGET_ARGS="--content-disposition --auth-no-challenge --no-cookie" -CURL_ARGS="-LJO#" - -if [[ "$tag" == 'LATEST' ]]; then - GH_TAGS="$GH_REPO/releases/latest" -fi - -# Validate token. -curl -o /dev/null -sH "$AUTH" $GH_REPO || { echo "Error: Invalid repo, token or network issue!"; exit 1; } - -# Read asset tags. -response=$(curl -sH "$AUTH" $GH_TAGS) - -# Get ID of the asset based on given filename. -eval $(echo "$response" | grep -m 1 "id.:" | grep -w id | tr : = | tr -cd '[[:alnum:]]=') -[ "$id" ] || { echo "Error: Failed to get release id for tag: $tag"; echo "$response" | awk 'length($0)<100' >&2; exit 1; } - -# Upload asset -echo "Uploading asset... " - -# Construct url -GH_ASSET="https://uploads.github.com/repos/$owner/$repo/releases/$id/assets?name=$(basename $filename)" - -curl "$GITHUB_OAUTH_BASIC" --data-binary @"$filename" -H "Authorization: token $github_api_token" -H "Content-Type: application/octet-stream" $GH_ASSET