From 2ab3c73be8c9a6e0d0183e6fbf678ab2c5d252de Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 Jun 2022 23:35:34 +0100 Subject: [PATCH 01/18] Influx v2 support --- cmd/tsbs_load_influx/creator.go | 39 ++++++++++++--- cmd/tsbs_load_influx/http_writer.go | 8 ++- cmd/tsbs_load_influx/http_writer_test.go | 10 ++-- cmd/tsbs_load_influx/main.go | 11 ++++ cmd/tsbs_run_queries_influx/http_client.go | 13 ++++- cmd/tsbs_run_queries_influx/main.go | 11 +++- pkg/targets/influx/implemented_target.go | 2 + scripts/load/load_influx.sh | 7 ++- scripts/run_queries/run_queries_influx.sh | 58 +++++++++++++++------- 9 files changed, 120 insertions(+), 39 deletions(-) diff --git a/cmd/tsbs_load_influx/creator.go b/cmd/tsbs_load_influx/creator.go index 28fc9a6bb..1f7f8d12c 100644 --- a/cmd/tsbs_load_influx/creator.go +++ b/cmd/tsbs_load_influx/creator.go @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) listDatabases() ([]string, error) { + client := http.Client{} u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL) - resp, err := http.Get(u) + req, err := http.NewRequest("GET", u, nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) + if err != nil { return nil, fmt.Errorf("listDatabases error: %s", err.Error()) } @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) { } ret := []string{} - for _, nestedName := range listing.Results[0].Series[0].Values { - name := nestedName[0] - // the _internal database is skipped: - if name == "_internal" { - continue + if len(listing.Results) > 0 { + for _, nestedName := range listing.Results[0].Series[0].Values { + name := nestedName[0] + // the _internal database is skipped: + if name == "_internal" { + continue + } + ret = append(ret, name) } - ret = append(ret, name) } return ret, nil } func (d *dbCreator) RemoveOldDB(dbName string) error { u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName) - resp, err := http.Post(u, "text/plain", nil) + client := http.Client{} + req, err := http.NewRequest("POST", u, nil) + if authToken != "" { + req.Header = http.Header{ + "Content-Type": []string{"text/plain"}, + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("drop db error: %s", err.Error()) } @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error { u.RawQuery = v.Encode() req, err := http.NewRequest("GET", u.String(), nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } if err != nil { return err } diff --git a/cmd/tsbs_load_influx/http_writer.go b/cmd/tsbs_load_influx/http_writer.go index b56ae2d8e..a53ce989f 100644 --- a/cmd/tsbs_load_influx/http_writer.go +++ b/cmd/tsbs_load_influx/http_writer.go @@ -14,6 +14,7 @@ import ( const ( httpClientName = "tsbs_load_influx" headerContentEncoding = "Content-Encoding" + headerAuthorization = "Authorization" headerGzip = "gzip" ) @@ -65,13 +66,16 @@ var ( textPlain = []byte("text/plain") ) -func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { +func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) } + if authToken != "" { + req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken)) + } req.SetBody(body) } @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, body, isGzip) + w.initializeReq(req, body, isGzip, authToken) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) diff --git a/cmd/tsbs_load_influx/http_writer_test.go b/cmd/tsbs_load_influx/http_writer_test.go index 170ae4ea3..ba27656c2 100644 --- a/cmd/tsbs_load_influx/http_writer_test.go +++ b/cmd/tsbs_load_influx/http_writer_test.go @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { defer fasthttp.ReleaseRequest(req) w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") if got := string(req.Body()); got != body { t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body) @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got) } - w.initializeReq(req, []byte(body), true) + w.initializeReq(req, []byte(body), true, "") if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip { t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip) } @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" normalURL := w.url // save for later modification - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) lat, err := w.executeReq(req, resp) @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err != errBackoff { t.Errorf("unexpected error response received (not backoff error): %v", err) @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err == nil { t.Errorf("unexpected non-error response received") diff --git a/cmd/tsbs_load_influx/main.go b/cmd/tsbs_load_influx/main.go index f6b85f1b5..268ffa95c 100644 --- a/cmd/tsbs_load_influx/main.go +++ b/cmd/tsbs_load_influx/main.go @@ -30,6 +30,9 @@ var ( useGzip bool doAbortOnExist bool consistency string + authToken string // InfluxDB v2 + bucketId string // InfluxDB v2 + orgId string // InfluxDB v2 ) // Global vars @@ -73,6 +76,8 @@ func init() { csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") + authToken = viper.GetString("auth-token") + orgId = viper.GetString("org") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") @@ -80,6 +85,12 @@ func init() { log.Fatalf("invalid consistency settings") } + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") diff --git a/cmd/tsbs_run_queries_influx/http_client.go b/cmd/tsbs_run_queries_influx/http_client.go index 24b7b4827..fbfd1b33b 100644 --- a/cmd/tsbs_run_queries_influx/http_client.go +++ b/cmd/tsbs_run_queries_influx/http_client.go @@ -14,6 +14,7 @@ import ( ) var bytesSlash = []byte("/") // heap optimization +var headerAuthorization = "Authorization" // HTTPClient is a reusable HTTP Client. type HTTPClient struct { @@ -22,6 +23,7 @@ type HTTPClient struct { Host []byte HostString string uri []byte + authToken string } // HTTPClientDoOptions wraps options uses when calling `Do`. @@ -46,12 +48,17 @@ func getHttpClient() *http.Client { } // NewHTTPClient creates a new HTTPClient. -func NewHTTPClient(host string) *HTTPClient { +func NewHTTPClient(host string, authToken string) *HTTPClient { + token := "" + if authToken != "" { + token = fmt.Sprintf("Token %s", authToken) + } return &HTTPClient{ client: getHttpClient(), Host: []byte(host), HostString: host, uri: []byte{}, // heap optimization + authToken: token, } } @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if err != nil { panic(err) } - + if w.authToken != "" { + req.Header.Add(headerAuthorization, w.authToken) + } // Perform the request while tracking latency: start := time.Now() resp, err := w.client.Do(req) diff --git a/cmd/tsbs_run_queries_influx/main.go b/cmd/tsbs_run_queries_influx/main.go index 48a84d757..8e96cb83f 100644 --- a/cmd/tsbs_run_queries_influx/main.go +++ b/cmd/tsbs_run_queries_influx/main.go @@ -20,6 +20,7 @@ import ( var ( daemonUrls []string chunkSize uint64 + authToken string ) // Global vars: @@ -35,6 +36,7 @@ func init() { pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") + pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") pflag.Parse() @@ -49,8 +51,13 @@ func init() { } csvDaemonUrls = viper.GetString("urls") + authToken = viper.GetString("auth-token") chunkSize = viper.GetUint64("chunk-response-size") - + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } daemonUrls = strings.Split(csvDaemonUrls, ",") if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) { database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] - p.w = NewHTTPClient(url) + p.w = NewHTTPClient(url, authToken) } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { diff --git a/pkg/targets/influx/implemented_target.go b/pkg/targets/influx/implemented_target.go index 736a37182..17cb88ba5 100644 --- a/pkg/targets/influx/implemented_target.go +++ b/pkg/targets/influx/implemented_target.go @@ -21,6 +21,8 @@ func (t *influxTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Fla flagSet.String(flagPrefix+"urls", "http://localhost:8086", "InfluxDB URLs, comma-separated. Will be used in a round-robin fashion.") flagSet.Int(flagPrefix+"replication-factor", 1, "Cluster replication factor (only applies to clustered databases).") flagSet.String(flagPrefix+"consistency", "all", "Write consistency. Must be one of: any, one, quorum, all.") + flagSet.String(flagPrefix+"auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") + flagSet.String(flagPrefix+"organization", "", "Organization name (InfluxDB v2).") flagSet.Duration(flagPrefix+"backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.") flagSet.Bool(flagPrefix+"gzip", true, "Whether to gzip encode requests (default true).") } diff --git a/scripts/load/load_influx.sh b/scripts/load/load_influx.sh index 90e9e13c0..f8c6774d9 100755 --- a/scripts/load/load_influx.sh +++ b/scripts/load/load_influx.sh @@ -10,6 +10,7 @@ fi # Load parameters - common DATA_FILE_NAME=${DATA_FILE_NAME:-influx-data.gz} DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} EXE_DIR=${EXE_DIR:-$(dirname $0)} source ${EXE_DIR}/load_common.sh @@ -20,7 +21,10 @@ until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do done # Remove previous database -curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} +curl --header "Authorization: Token $INFLUX_AUTH_TOKEN" \ + -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} + + # Load new data cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --db-name=${DATABASE_NAME} \ @@ -28,4 +32,5 @@ cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --workers=${NUM_WORKERS} \ --batch-size=${BATCH_SIZE} \ --reporting-period=${REPORTING_PERIOD} \ + --auth-token $INFLUX_AUTH_TOKEN \ --urls=http://${DATABASE_HOST}:${DATABASE_PORT} diff --git a/scripts/run_queries/run_queries_influx.sh b/scripts/run_queries/run_queries_influx.sh index 749902e63..23c96554e 100755 --- a/scripts/run_queries/run_queries_influx.sh +++ b/scripts/run_queries/run_queries_influx.sh @@ -7,17 +7,25 @@ if [[ -z "$EXE_FILE_NAME" ]]; then exit 1 fi -# Default queries folder -BULK_DATA_DIR=${BULK_DATA_DIR:-"/tmp/bulk_queries"} -MAX_QUERIES=${MAX_QUERIES:-"0"} -# How many concurrent worker would run queries - match num of cores, or default to 4 -NUM_WORKERS=${NUM_WORKERS:-$(grep -c ^processor /proc/cpuinfo 2> /dev/null || echo 4)} +DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} + +EXE_DIR=${EXE_DIR:-$(dirname $0)} +source ${EXE_DIR}/run_common.sh + + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Ensure RESULTS DIR available +mkdir -p ${RESULTS_DIR} # # Run test for one file # -function run_file() -{ +function run_file() { # $FULL_DATA_FILE_NAME: /full/path/to/file_with.ext # $DATA_FILE_NAME: file_with.ext # $DIR: /full/path/to @@ -29,24 +37,36 @@ function run_file() EXTENSION="${DATA_FILE_NAME##*.}" NO_EXT_DATA_FILE_NAME="${DATA_FILE_NAME%.*}" - # Several options on how to name results file - #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" - OUT_FULL_FILE_NAME="${DIR}/result_${NO_EXT_DATA_FILE_NAME}.out" - #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" - if [ "${EXTENSION}" == "gz" ]; then GUNZIP="gunzip" else GUNZIP="cat" fi - echo "Running ${DATA_FILE_NAME}" - cat $FULL_DATA_FILE_NAME \ - | $GUNZIP \ - | $EXE_FILE_NAME \ - --max-queries $MAX_QUERIES \ - --workers $NUM_WORKERS \ - | tee $OUT_FULL_FILE_NAME + for run in $(seq ${REPETITIONS}); do + # Several options on how to name results file + #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" + OUT_FULL_FILE_NAME="${RESULTS_DIR}/result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" + HDR_FULL_FILE_NAME="${RESULTS_DIR}/HDR_TXT_result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + + echo "Running ${DATA_FILE_NAME}" + echo " Saving results to ${OUT_FULL_FILE_NAME}" + echo " Saving HDR results to ${HDR_FULL_FILE_NAME}" + + cat $FULL_DATA_FILE_NAME | + $GUNZIP | + $EXE_FILE_NAME \ + --max-queries=${MAX_QUERIES} \ + --db-name=${DATABASE_NAME} \ + --workers=${NUM_WORKERS} \ + --print-interval=${QUERIES_PRINT_INTERVAL} \ + --hdr-latencies=${HDR_FULL_FILE_NAME} \ + --auth-token $INFLUX_AUTH_TOKEN \ + --debug=${DEBUG} \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} | + tee $OUT_FULL_FILE_NAME + done } if [ "$#" -gt 0 ]; then From f49fc9f871c4719443eaa641fda859f5e67f3c58 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 27 Jun 2022 23:59:04 +0100 Subject: [PATCH 02/18] Included full_cycle_minitest_influx.sh --- .../full_cycle_minitest_influx.sh | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100755 scripts/full_cycle_minitest/full_cycle_minitest_influx.sh diff --git a/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh new file mode 100755 index 000000000..6c1cd8701 --- /dev/null +++ b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# showcases the ftsb 3 phases for influxdb +# - 1) data and query generation +# - 2) data loading/insertion +# - 3) query execution + +SCALE=${SCALE:-"10"} +SEED=${SEED:-"123"} +FORMAT="influx" + +mkdir -p /tmp/bulk_data +rm /tmp/bulk_data/${FORMAT}_* + +# exit immediately on error +set -e + +# Load parameters - common +DATABASE_PORT=${DATABASE_PORT:-8086} +DATABASE_HOST=${DATABASE_HOST:-localhost} + +# All available query types (sorted alphabetically) +QUERY_TYPES_ALL="\ + cpu-max-all-1 \ + cpu-max-all-8 \ + double-groupby-1 \ + double-groupby-5 \ + double-groupby-all \ + groupby-orderby-limit \ + high-cpu-1 \ + high-cpu-all \ + lastpoint \ + single-groupby-1-1-1 \ + single-groupby-1-1-12 \ + single-groupby-1-8-1 \ + single-groupby-5-1-1 \ + single-groupby-5-1-12 \ + single-groupby-5-8-1" + +# What query types to generate +QUERY_TYPES=${QUERY_TYPES:-$QUERY_TYPES_ALL} + +# generate data +$GOPATH/bin/tsbs_generate_data --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} --file /tmp/bulk_data/${FORMAT}_data + +for queryName in $QUERY_TYPES; do + echo "generating query: $queryName" + $GOPATH/bin/tsbs_generate_queries --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} \ + --queries=10 \ + --query-type $queryName \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Remove previous database +curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20benchmark + +# insert benchmark +$GOPATH/bin/tsbs_load_${FORMAT} \ + --db-name=benchmark \ + --backoff=1s \ + --workers=1 \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file=/tmp/bulk_data/${FORMAT}_data + +# queries benchmark +for queryName in $QUERY_TYPES; do + echo "running query: $queryName" + $GOPATH/bin/tsbs_run_queries_${FORMAT} --print-responses \ + --workers=1 \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done From 82bb42f791d2ec346e8c9416f35b12a3fc90e136 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Tue, 28 Jun 2022 00:08:25 +0100 Subject: [PATCH 03/18] Included docs reference to auth-token. Included required setup steps for influx v2 --- docs/influx.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docs/influx.md b/docs/influx.md index e7f27edba..fefe67e00 100644 --- a/docs/influx.md +++ b/docs/influx.md @@ -7,6 +7,33 @@ using the data importer (`tsbs_load_influx`), and additional flags available for the query runner (`tsbs_run_queries_influx`). **This should be read *after* the main README.** +## Setup steps InfluxDB v2 + +If on a new setup run the following command: + +```bash +influx setup +``` + +If you need to create a new bucket adjust the bucket name (`-n`) and the org name (`-o`) accordingly: + +```bash +influx bucket create -n bucket-perf -o org -r 0 +``` + +Create a DBRP mapping with the InfluxDB 1.x compatibility API ([official docs](https://docs.influxdata.com/influxdb/cloud/reference/cli/influx/v1/dbrp/create/)). + +Adjust bucket name and db accordingly: + +```bash +influx v1 dbrp create --db benchmark --rp 0 --bucket-id `influx bucket ls --name bucket-perf | awk -v i=2 -v j=1 'FNR == i {print $j}'` --default +``` + +Retrieve the auth token as follows: +```bash +influx auth list +``` + ## Data format Data generated by `tsbs_generate_data` for InfluxDB is serialized in a @@ -58,6 +85,11 @@ Whether to encode writes to the server with gzip. For best performance, encoding with gzip is the best choice, but if the server does not support or has gzip disabled, this flag should be set to false. +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. + --- ## `tsbs_run_queries_influx` Additional Flags @@ -76,3 +108,10 @@ everything in a single response. Comma-separated list of URLs to connect to for querying. Workers will be distributed in a round robin fashion across the URLs. + +### Miscellaneous + +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. From a6c60715378f212a975ad5b64c1594b5163f9581 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Mon, 29 Jan 2024 22:46:53 +0000 Subject: [PATCH 04/18] This commit addresses the problem that the IoT workload generator uses numeric tags which is not the case for the DevOps workload. The MongoDB binding uses flatbuffers to serialize generated data to disk. We change the flatbuffers specification to include the float tags as well as the code to serialize and deserialize the structures. --- cmd/tsbs_load_mongo/aggregate_loader.go | 14 ++++-- cmd/tsbs_load_mongo/common_loader.go | 33 +++++++++++++ cmd/tsbs_load_mongo/document_per_loader.go | 7 +-- pkg/targets/mongo/MongoFloat32Tag.go | 56 ++++++++++++++++++++++ pkg/targets/mongo/MongoPoint.go | 9 +++- pkg/targets/mongo/MongoReading.go | 9 +++- pkg/targets/mongo/MongoStringTag.go | 52 ++++++++++++++++++++ pkg/targets/mongo/MongoTag.go | 35 +++++++++++--- pkg/targets/mongo/MongoTagValue.go | 32 +++++++++++++ pkg/targets/mongo/mongo.fbs | 18 ++++++- pkg/targets/mongo/serializer.go | 31 ++++++++---- 11 files changed, 271 insertions(+), 25 deletions(-) create mode 100644 pkg/targets/mongo/MongoFloat32Tag.go create mode 100644 pkg/targets/mongo/MongoStringTag.go create mode 100644 pkg/targets/mongo/MongoTagValue.go diff --git a/cmd/tsbs_load_mongo/aggregate_loader.go b/cmd/tsbs_load_mongo/aggregate_loader.go index ca1b1682f..fe6e607a0 100644 --- a/cmd/tsbs_load_mongo/aggregate_loader.go +++ b/cmd/tsbs_load_mongo/aggregate_loader.go @@ -29,10 +29,13 @@ func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { p.Tags(t, j) key := string(t.Key()) if key == "hostname" || key == "name" { + myMap := map[string]interface{}{} + success := parseTag(myMap, t) + if( !success ) { continue } // the hostame is the defacto index for devops tags // the truck name is the defacto index for iot tags h := fnv.New32a() - h.Write([]byte(string(t.Value()))) + h.Write([]byte(myMap[key].(string))) return uint(h.Sum32()) % i.partitions } } @@ -128,12 +131,15 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 eventCnt := uint64(0) for _, event := range batch.arr { tagsSlice := bson.D{} - tagsMap := map[string]string{} + tagsMap := map[string]interface{}{} t := &tsbsMongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - tagsMap[string(t.Key())] = string(t.Value()) - tagsSlice = append(tagsSlice, bson.E{string(t.Key()), string(t.Value())}) + parseTag(tagsMap, t) + bbson, success := parseTagAsBson(t) + if(success == true) { + tagsSlice = append(tagsSlice, bbson) + } } // Determine which document this event belongs too diff --git a/cmd/tsbs_load_mongo/common_loader.go b/cmd/tsbs_load_mongo/common_loader.go index 6fc21019c..3e22b5a53 100644 --- a/cmd/tsbs_load_mongo/common_loader.go +++ b/cmd/tsbs_load_mongo/common_loader.go @@ -13,12 +13,45 @@ import ( "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" "github.com/timescale/tsbs/pkg/targets/mongo" + "go.mongodb.org/mongo-driver/bson" ) type fileDataSource struct { lenBuf []byte r *bufio.Reader } +func parseTag(x map[string]interface{}, t *mongo.MongoTag) (bool) { + unionTable := new(flatbuffers.Table) + if(!t.Value(unionTable)) { return false } + if(t.ValueType() == mongo.MongoTagValueMongoStringTag) { + stringTag := new(mongo.MongoStringTag) + stringTag.Init(unionTable.Bytes, unionTable.Pos) + x[string(t.Key())] = string(stringTag.Value()) + } else if(t.ValueType() == mongo.MongoTagValueMongoFloat32Tag) { + floatTag := new(mongo.MongoFloat32Tag) + floatTag.Init(unionTable.Bytes, unionTable.Pos) + x[string(t.Key())] = float32(floatTag.Value()) + } else { + return false + } + return true +} + +func parseTagAsBson(t *mongo.MongoTag) (bson.E, bool) { + unionTable := new(flatbuffers.Table) + if(!t.Value(unionTable)) { return bson.E{}, false } + if(t.ValueType() == mongo.MongoTagValueMongoStringTag) { + stringTag := new(mongo.MongoStringTag) + stringTag.Init(unionTable.Bytes, unionTable.Pos) + return bson.E{string(t.Key()), string(stringTag.Value())}, true + } else if(t.ValueType() == mongo.MongoTagValueMongoFloat32Tag) { + floatTag := new(mongo.MongoFloat32Tag) + floatTag.Init(unionTable.Bytes, unionTable.Pos) + return bson.E{string(t.Key()), float32(floatTag.Value())}, true + } else { + return bson.E{}, false + } +} func (d *fileDataSource) NextItem() data.LoadedPoint { item := &mongo.MongoPoint{} diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index ab6fc99b2..cb0eae645 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -67,7 +67,7 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin x := spPool.Get().(*singlePoint) (*x)["measurement"] = string(event.MeasurementName()) (*x)[timestampField] = time.Unix(0, event.Timestamp()) - (*x)["tags"] = map[string]string{} + (*x)["tags"] = map[string]interface{}{} f := &tsbsMongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) @@ -76,7 +76,7 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin t := &tsbsMongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - (*x)["tags"].(map[string]string)[string(t.Key())] = string(t.Value()) + parseTag((*x)["tags"].(map[string]interface{}), t) } p.pvs[i] = x metricCnt += uint64(event.FieldsLength()) @@ -95,7 +95,8 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin tags := bson.D{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - tags = append(tags, bson.E{string(t.Key()), string(t.Value())}) + bbson, success := parseTagAsBson(t) + if(success == true) { tags = append(tags, bbson) } } x = append(x, bson.E{"tags", tags}) p.pvs[i] = x diff --git a/pkg/targets/mongo/MongoFloat32Tag.go b/pkg/targets/mongo/MongoFloat32Tag.go new file mode 100644 index 000000000..1eb0e9091 --- /dev/null +++ b/pkg/targets/mongo/MongoFloat32Tag.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MongoFloat32Tag struct { + _tab flatbuffers.Table +} + +func GetRootAsMongoFloat32Tag(buf []byte, offset flatbuffers.UOffsetT) *MongoFloat32Tag { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MongoFloat32Tag{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMongoFloat32Tag(buf []byte, offset flatbuffers.UOffsetT) *MongoFloat32Tag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoFloat32Tag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MongoFloat32Tag) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MongoFloat32Tag) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MongoFloat32Tag) Value() float32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetFloat32(o + rcv._tab.Pos) + } + return 0.0 +} + +func (rcv *MongoFloat32Tag) MutateValue(n float32) bool { + return rcv._tab.MutateFloat32Slot(4, n) +} + +func MongoFloat32TagStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MongoFloat32TagAddValue(builder *flatbuffers.Builder, value float32) { + builder.PrependFloat32Slot(0, value, 0.0) +} +func MongoFloat32TagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/targets/mongo/MongoPoint.go b/pkg/targets/mongo/MongoPoint.go index c71c4caa8..dd67019eb 100644 --- a/pkg/targets/mongo/MongoPoint.go +++ b/pkg/targets/mongo/MongoPoint.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoPoint(buf []byte, offset flatbuffers.UOffsetT) *MongoPoint { return x } +func GetSizePrefixedRootAsMongoPoint(buf []byte, offset flatbuffers.UOffsetT) *MongoPoint { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoPoint{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoPoint) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i diff --git a/pkg/targets/mongo/MongoReading.go b/pkg/targets/mongo/MongoReading.go index 0f8bcff85..403ef2b93 100644 --- a/pkg/targets/mongo/MongoReading.go +++ b/pkg/targets/mongo/MongoReading.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoReading(buf []byte, offset flatbuffers.UOffsetT) *MongoReadin return x } +func GetSizePrefixedRootAsMongoReading(buf []byte, offset flatbuffers.UOffsetT) *MongoReading { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoReading{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoReading) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i diff --git a/pkg/targets/mongo/MongoStringTag.go b/pkg/targets/mongo/MongoStringTag.go new file mode 100644 index 000000000..d086ad714 --- /dev/null +++ b/pkg/targets/mongo/MongoStringTag.go @@ -0,0 +1,52 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MongoStringTag struct { + _tab flatbuffers.Table +} + +func GetRootAsMongoStringTag(buf []byte, offset flatbuffers.UOffsetT) *MongoStringTag { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MongoStringTag{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMongoStringTag(buf []byte, offset flatbuffers.UOffsetT) *MongoStringTag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoStringTag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MongoStringTag) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MongoStringTag) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MongoStringTag) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func MongoStringTagStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MongoStringTagAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(value), 0) +} +func MongoStringTagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/targets/mongo/MongoTag.go b/pkg/targets/mongo/MongoTag.go index bdf37ef0a..7817f11f2 100644 --- a/pkg/targets/mongo/MongoTag.go +++ b/pkg/targets/mongo/MongoTag.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoTag(buf []byte, offset flatbuffers.UOffsetT) *MongoTag { return x } +func GetSizePrefixedRootAsMongoTag(buf []byte, offset flatbuffers.UOffsetT) *MongoTag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoTag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoTag) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i @@ -34,22 +41,38 @@ func (rcv *MongoTag) Key() []byte { return nil } -func (rcv *MongoTag) Value() []byte { +func (rcv *MongoTag) ValueType() MongoTagValue { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) + return MongoTagValue(rcv._tab.GetByte(o + rcv._tab.Pos)) } - return nil + return 0 +} + +func (rcv *MongoTag) MutateValueType(n MongoTagValue) bool { + return rcv._tab.MutateByteSlot(6, byte(n)) +} + +func (rcv *MongoTag) Value(obj *flatbuffers.Table) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + rcv._tab.Union(obj, o) + return true + } + return false } func MongoTagStart(builder *flatbuffers.Builder) { - builder.StartObject(2) + builder.StartObject(3) } func MongoTagAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) } +func MongoTagAddValueType(builder *flatbuffers.Builder, valueType MongoTagValue) { + builder.PrependByteSlot(1, byte(valueType), 0) +} func MongoTagAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(value), 0) } func MongoTagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/pkg/targets/mongo/MongoTagValue.go b/pkg/targets/mongo/MongoTagValue.go new file mode 100644 index 000000000..8be0f81d9 --- /dev/null +++ b/pkg/targets/mongo/MongoTagValue.go @@ -0,0 +1,32 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import "strconv" + +type MongoTagValue byte + +const ( + MongoTagValueNONE MongoTagValue = 0 + MongoTagValueMongoStringTag MongoTagValue = 1 + MongoTagValueMongoFloat32Tag MongoTagValue = 2 +) + +var EnumNamesMongoTagValue = map[MongoTagValue]string{ + MongoTagValueNONE: "NONE", + MongoTagValueMongoStringTag: "MongoStringTag", + MongoTagValueMongoFloat32Tag: "MongoFloat32Tag", +} + +var EnumValuesMongoTagValue = map[string]MongoTagValue{ + "NONE": MongoTagValueNONE, + "MongoStringTag": MongoTagValueMongoStringTag, + "MongoFloat32Tag": MongoTagValueMongoFloat32Tag, +} + +func (v MongoTagValue) String() string { + if s, ok := EnumNamesMongoTagValue[v]; ok { + return s + } + return "MongoTagValue(" + strconv.FormatInt(int64(v), 10) + ")" +} diff --git a/pkg/targets/mongo/mongo.fbs b/pkg/targets/mongo/mongo.fbs index ca806091e..64dcb75ae 100644 --- a/pkg/targets/mongo/mongo.fbs +++ b/pkg/targets/mongo/mongo.fbs @@ -1,8 +1,22 @@ // mongo.fbs -namespace serialize; +// namespace serialize; +namespace mongo; + +table MongoStringTag { + value: string; +} + +table MongoFloat32Tag { + value: float; +} + +union MongoTagValue { + MongoStringTag, MongoFloat32Tag +} + table MongoTag { key:string; - value:string; + value: MongoTagValue; } table MongoReading { diff --git a/pkg/targets/mongo/serializer.go b/pkg/targets/mongo/serializer.go index b1153d7db..7307e4552 100644 --- a/pkg/targets/mongo/serializer.go +++ b/pkg/targets/mongo/serializer.go @@ -6,8 +6,8 @@ import ( "github.com/timescale/tsbs/pkg/data" "io" "sync" - flatbuffers "github.com/google/flatbuffers/go" + "os" ) var fbBuilderPool = &sync.Pool{ @@ -30,20 +30,35 @@ func (s *Serializer) Serialize(p *data.Point, w io.Writer) (err error) { tagKeys := p.TagKeys() tagValues := p.TagValues() for i := len(tagKeys); i > 0; i-- { + var tagType MongoTagValue + var tagElement flatbuffers.UOffsetT switch v := tagValues[i-1].(type) { case string: - k := string(tagKeys[i-1]) - key := b.CreateString(k) val := b.CreateString(v) - MongoTagStart(b) - MongoTagAddKey(b, key) - MongoTagAddValue(b, val) - tags = append(tags, MongoTagEnd(b)) + MongoStringTagStart(b) + MongoStringTagAddValue(b, val) + tagType = MongoTagValueMongoStringTag + tagElement = MongoStringTagEnd(b) case nil: continue + case float32: + vv := tagValues[i-1] + MongoFloat32TagStart(b) + MongoFloat32TagAddValue(b, vv.(float32)) + tagType = MongoTagValueMongoFloat32Tag + tagElement = MongoFloat32TagEnd(b) default: - panic("non-string tags not implemented for mongo db") + fmt.Fprintf(os.Stderr, "non-string tags not implemented for mongo db: %s\n", v) + // continue + panic("non-string, non-float tags not implemented for mongo db") } + k := string(tagKeys[i-1]) + key := b.CreateString(k) + MongoTagStart(b) + MongoTagAddKey(b, key) + MongoTagAddValueType(b, tagType) + MongoTagAddValue(b, tagElement) + tags = append(tags, MongoTagEnd(b)) } MongoPointStartTagsVector(b, len(tags)) for _, t := range tags { From 913aacb87604e7b7ef369ff4dc7904db0e1f3650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 13 Feb 2024 10:35:55 +0000 Subject: [PATCH 05/18] baseline support iot workload for mongodb binding --- .../databases/mongo/common.go | 15 +++++++ .../databases/mongo/iot.go | 41 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 cmd/tsbs_generate_queries/databases/mongo/iot.go diff --git a/cmd/tsbs_generate_queries/databases/mongo/common.go b/cmd/tsbs_generate_queries/databases/mongo/common.go index 26a2c48b8..93a1d7a01 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/common.go +++ b/cmd/tsbs_generate_queries/databases/mongo/common.go @@ -4,6 +4,7 @@ import ( "time" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/finance" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" "github.com/timescale/tsbs/pkg/query" @@ -54,3 +55,17 @@ func (g *BaseGenerator) NewFinance(start, end time.Time, scale int) (utils.Query Core: core, }, nil } + +// NewIoT creates a new iot use case query generator. +func (g *BaseGenerator) NewIoT(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := iot.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + return &IoT{ + BaseGenerator: g, + Core: core, + }, nil +} diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go new file mode 100644 index 000000000..d37cd9f56 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -0,0 +1,41 @@ +package mongo + +import ( +// "fmt" +// "strings" + "time" + "encoding/gob" + + "go.mongodb.org/mongo-driver/bson" +// "go.mongodb.org/mongo-driver/mongo" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" +// "github.com/timescale/tsbs/pkg/query" +) + +func init() { + // needed for serializing the mongo query to gob + gob.Register([]interface{}{}) + gob.Register(map[string]interface{}{}) + gob.Register([]map[string]interface{}{}) + gob.Register(bson.M{}) + gob.Register(bson.D{}) + gob.Register([]bson.M{}) +} + +// IoT produces Mongo-specific queries for all the iot query types. +type IoT struct { + *iot.Core + *BaseGenerator +} + +// NewIoT makes an IoT object ready to generate Queries. +func NewIoT(start, end time.Time, scale int, g *BaseGenerator) *IoT { + c, err := iot.NewCore(start, end, scale) + databases.PanicIfErr(err) + return &IoT{ + Core: c, + BaseGenerator: g, + } +} From 1ed3f9c9b7ac6f184695cfedbf007a1930ff5515 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 08:24:01 +0000 Subject: [PATCH 06/18] add fuction for MongoDB: LastLocByTruck --- .../databases/mongo/iot.go | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index d37cd9f56..d4d9dab60 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -1,17 +1,17 @@ package mongo import ( -// "fmt" + "fmt" // "strings" "time" "encoding/gob" "go.mongodb.org/mongo-driver/bson" -// "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" -// "github.com/timescale/tsbs/pkg/query" + "github.com/timescale/tsbs/pkg/query" ) func init() { @@ -24,6 +24,12 @@ func init() { gob.Register([]bson.M{}) } +func (i *IoT) getTrucksFilterArray(nTrucks int) []string { + names, err := i.GetRandomTrucks(nTrucks) + panicIfErr(err) + return names +} + // IoT produces Mongo-specific queries for all the iot query types. type IoT struct { *iot.Core @@ -39,3 +45,41 @@ func NewIoT(start, end time.Time, scale int, g *BaseGenerator) *IoT { BaseGenerator: g, } } + +func (i *IoT) LastLocByTruck(qi query.Query, nTrucks int) { + trucks := i.getTrucksFilterArray(nTrucks) + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "tags.name": bson.M{ + "$in": trucks, + }, + }, + + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "longitude": "$longitude", + "latitude": "$latitude", + "time": "$time", + }, + }, + }, + }, + }}, + } + + humanLabel := "MongoDB last location by specific truck(s)" + humanDesc := fmt.Sprintf("%s: random %4d trucks (%v)", humanLabel, nTrucks, trucks) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From 510485831da7e631cb241e65503a75041652b4e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 11:03:56 +0200 Subject: [PATCH 07/18] add function for MonogoDB: LastLocPerTruck --- .../databases/mongo/iot.go | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index d4d9dab60..670bfa799 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -83,3 +83,71 @@ func (i *IoT) LastLocByTruck(qi query.Query, nTrucks int) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// LastLocPerTruck finds all the truck locations along with truck and driver names. +func (i *IoT) LastLocPerTruck(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "readings" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "longitude": "$longitude", + "latitude": "$latitude", + "time": "$time", + "driver": "$tags.driver", + }, + }, + }, + }, + }}, + } + /* +db.point_data.aggregate( + [ + { $match: { + $and: [ + { "tags.fleet": "South" }, + { "tags.name": { $ne: null } }, + { "measurement": "readings" } + ] + } }, + { $group: { + _id: "$tags.name", + output: { + $top: { + sortBy: {time: -1}, + output: { + longitude: "$longitude", + latitude: "$latitude", + time: "$time", + driver: "$tags.driver" + } + } + }} + ]) + + + */ + + humanLabel := "MongoDB last location for each truck" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From ebec19c39da19f73fab936bf5d7fa12bad327b8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 11:29:48 +0200 Subject: [PATCH 08/18] add function for MonogoDB: TrucksWithLowFuel --- .../databases/mongo/iot.go | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 670bfa799..492f09dbe 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -114,33 +114,6 @@ func (i *IoT) LastLocPerTruck(qi query.Query) { }, }}, } - /* -db.point_data.aggregate( - [ - { $match: { - $and: [ - { "tags.fleet": "South" }, - { "tags.name": { $ne: null } }, - { "measurement": "readings" } - ] - } }, - { $group: { - _id: "$tags.name", - output: { - $top: { - sortBy: {time: -1}, - output: { - longitude: "$longitude", - latitude: "$latitude", - time: "$time", - driver: "$tags.driver" - } - } - }} - ]) - - - */ humanLabel := "MongoDB last location for each truck" humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) @@ -151,3 +124,50 @@ db.point_data.aggregate( q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// TrucksWithLowFuel finds all trucks with low fuel (less than 10%). +func (i *IoT) TrucksWithLowFuel(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "diagnostics" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "driver": "$tags.driver", + "time": "$time", + "fleet": "$tags.fleet", + "fuel": "$fuel_state", + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "output.fuel": bson.M{ + "$lte": 0.1, + }, + }, + }}, + } + humanLabel := "MongoDB trucks with low fuel in a fleet" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From 00ac74e793eba0a0a5089cea24b95c714b4704ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 12:39:58 +0200 Subject: [PATCH 09/18] add function for MongoDB: TrucksWithHighLoad --- .../databases/mongo/iot.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 492f09dbe..1d50cf879 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -171,3 +171,63 @@ func (i *IoT) TrucksWithLowFuel(qi query.Query) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +func (i *IoT) TrucksWithHighLoad(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "diagnostics" }, + }, + }, + }}, + /* + output: { + + + } + } + } + } }, +] ) + */ + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "driver": "$tags.driver", + "time": "$time", + "fleet": "$tags.fleet", + "current_load": "$current_load", + "load_capacity": "$tags.load_capacity", + "current_load_ratio": bson.M{ + "$divide": bson.A{"$current_load", "$tags.load_capacity"}, + }, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "output.current_load_ratio": bson.M{ + "$gte": 0.9, + }, + }, + }}, + } + humanLabel := "MongoDB trucks with high load in a fleet" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} \ No newline at end of file From 90aad1aba48c90f428f8dff13cac4e09428e5328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 12:54:36 +0200 Subject: [PATCH 10/18] add function for MongoDB: StationaryTrucks --- .../databases/mongo/iot.go | 64 +++++++++++++++---- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 1d50cf879..dcb4f7ad8 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -184,16 +184,6 @@ func (i *IoT) TrucksWithHighLoad(qi query.Query) { }, }, }}, - /* - output: { - - - } - } - } - } }, -] ) - */ {{ "$group", bson.M{ "_id": "$tags.name", @@ -230,4 +220,56 @@ func (i *IoT) TrucksWithHighLoad(qi query.Query) { q.Pipeline = pipelineQuery q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) -} \ No newline at end of file +} + +func (i *IoT) StationaryTrucks(qi query.Query) { + interval := i.Interval.MustRandWindow(iot.StationaryDuration) + start := interval.Start() + // start := interval.Start().Format(goTimeFmt) + // end := interval.End().Format(goTimeFmt) + end := interval.End() + fleet := i.GetRandomFleet() + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$lte": 1.0}, + }, + }}, + } + + humanLabel := "MongoDB stationary trucks (trucks with low velocity)" + humanDesc := fmt.Sprintf("%s: (%s) in [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From 54446eac01719ad8b294c5077beb8407d7b329b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 13:22:19 +0200 Subject: [PATCH 11/18] add function for MongoDB: TrucksWithLongDrivingSessions --- .../databases/mongo/common.go | 2 + .../databases/mongo/iot.go | 82 ++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/common.go b/cmd/tsbs_generate_queries/databases/mongo/common.go index 93a1d7a01..705f06486 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/common.go +++ b/cmd/tsbs_generate_queries/databases/mongo/common.go @@ -10,6 +10,8 @@ import ( "github.com/timescale/tsbs/pkg/query" ) +const goTimeFmt = "2006-01-02 15:04:05.999999 -0700" + // BaseGenerator contains settings specific for Mongo database. type BaseGenerator struct { UseNaive bool diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index dcb4f7ad8..ff475d1c9 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -222,6 +222,7 @@ func (i *IoT) TrucksWithHighLoad(qi query.Query) { q.HumanDescription = []byte(humanDesc) } +// StationaryTrucks finds all trucks that have low average velocity in a time window. func (i *IoT) StationaryTrucks(qi query.Query) { interval := i.Interval.MustRandWindow(iot.StationaryDuration) start := interval.Start() @@ -229,7 +230,7 @@ func (i *IoT) StationaryTrucks(qi query.Query) { // end := interval.End().Format(goTimeFmt) end := interval.End() fleet := i.GetRandomFleet() - + pipelineQuery := mongo.Pipeline{ {{ "$match", bson.M{ @@ -259,7 +260,7 @@ func (i *IoT) StationaryTrucks(qi query.Query) { }}, {{ "$match", bson.M{ - "avg_velocity": bson.M{"$lte": 1.0}, + "avg_velocity": bson.M{"$lt": 1.0}, }, }}, } @@ -273,3 +274,80 @@ func (i *IoT) StationaryTrucks(qi query.Query) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// TrucksWithLongDrivingSessions finds all trucks that have not stopped at least 20 mins in the last 4 hours. +func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { + fleet := i.GetRandomFleet() + interval := i.Interval.MustRandWindow(iot.LongDrivingSessionDuration) + start := interval.Start() + end := interval.End() + // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 5 mins per hour. + numIntervals := tenMinutePeriods(5, iot.LongDrivingSessionDuration) + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "tags.name": bson.M{ "$ne": nil }, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$gte": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "driver": "$_id.driver", + }, + "active_10_min_sessions": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$match", bson.M{ + "active_10_min_sessions": bson.M{"$gt": numIntervals}, + }, + }}, + } + + humanLabel := "MongoDB trucks with longer driving sessions" + humanDesc := fmt.Sprintf("%s: (%s) stopped less than 20 mins in 4 hour period [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// tenMinutePeriods calculates the number of 10 minute periods that can fit in +// the time duration if we subtract the minutes specified by minutesPerHour value. +// E.g.: 4 hours - 5 minutes per hour = 3 hours and 40 minutes = 22 ten minute periods +func tenMinutePeriods(minutesPerHour float64, duration time.Duration) int { + durationMinutes := duration.Minutes() + leftover := minutesPerHour * duration.Hours() + return int((durationMinutes - leftover) / 10) +} From e5126b5abd519883ba808e508a0ca156fb507620 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 13:43:30 +0200 Subject: [PATCH 12/18] add function for MongoDB: TrucksWithLongDailySessions --- .../databases/mongo/iot.go | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index ff475d1c9..9041696f8 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -343,6 +343,73 @@ func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { q.HumanDescription = []byte(humanDesc) } +func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { + fleet := i.GetRandomFleet() + interval := i.Interval.MustRandWindow(iot.DailyDrivingDuration) + start := interval.Start() + end := interval.End() + // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 5 mins per hour. + numIntervals := tenMinutePeriods(35, iot.DailyDrivingDuration) + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "tags.name": bson.M{ "$ne": nil }, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$gte": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "driver": "$_id.driver", + }, + "active_10_min_sessions": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$match", bson.M{ + "active_10_min_sessions": bson.M{"$gt": numIntervals}, + }, + }}, + } + + humanLabel := "MongoDB trucks with longer daily sessions" + humanDesc := fmt.Sprintf("%s: in fleet (%s) drove more than 10hours in the last 24 hours [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + // tenMinutePeriods calculates the number of 10 minute periods that can fit in // the time duration if we subtract the minutes specified by minutesPerHour value. // E.g.: 4 hours - 5 minutes per hour = 3 hours and 40 minutes = 22 ten minute periods From 198e2ece42d7bbbdd7402095a8c438f0cba86b52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 14:24:30 +0200 Subject: [PATCH 13/18] add function for MongoDB: AvgVsProjectedFuelConsumption --- .../databases/mongo/iot.go | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 9041696f8..3ebcb153b 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -24,6 +24,15 @@ func init() { gob.Register([]bson.M{}) } +// tenMinutePeriods calculates the number of 10 minute periods that can fit in +// the time duration if we subtract the minutes specified by minutesPerHour value. +// E.g.: 4 hours - 5 minutes per hour = 3 hours and 40 minutes = 22 ten minute periods +func tenMinutePeriods(minutesPerHour float64, duration time.Duration) int { + durationMinutes := duration.Minutes() + leftover := minutesPerHour * duration.Hours() + return int((durationMinutes - leftover) / 10) +} + func (i *IoT) getTrucksFilterArray(nTrucks int) []string { names, err := i.GetRandomTrucks(nTrucks) panicIfErr(err) @@ -410,11 +419,39 @@ func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { q.HumanDescription = []byte(humanDesc) } -// tenMinutePeriods calculates the number of 10 minute periods that can fit in -// the time duration if we subtract the minutes specified by minutesPerHour value. -// E.g.: 4 hours - 5 minutes per hour = 3 hours and 40 minutes = 22 ten minute periods -func tenMinutePeriods(minutesPerHour float64, duration time.Duration) int { - durationMinutes := duration.Minutes() - leftover := minutesPerHour * duration.Hours() - return int((durationMinutes - leftover) / 10) + +// AvgVsProjectedFuelConsumption calculates average and projected fuel consumption per fleet. +func (i *IoT) AvgVsProjectedFuelConsumption(qi query.Query) { + fleet := i.GetRandomFleet() + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.name": bson.M{ "$ne": nil }, + "tags.nominal_fuel_consumption": bson.M{ "$ne": nil }, + "velocity": bson.M{"$gt": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.fleet", + "mean_fuel_consumption": bson.M{ + "$avg": "$fuel_consumption", + }, + "nominal_fuel_consumption": bson.M{ + "$avg": "$tags.nominal_fuel_consumption", + }, + }, + }}, + } + + humanLabel := "MongoDB average vs projected fuel consumption per fleet" + humanDesc := fmt.Sprintf("%s: in fleet (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) } From 3b1b99f47803c07f05970ddbc77deec02a396880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 15:17:09 +0200 Subject: [PATCH 14/18] add function for MongoDB: AvgDailyDrivingDuration --- .../databases/mongo/iot.go | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 3ebcb153b..d6e4f955f 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -455,3 +455,71 @@ func (i *IoT) AvgVsProjectedFuelConsumption(qi query.Query) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// AvgDailyDrivingDuration finds the average driving duration per driver. +func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$tags.fleet", + "name": "$tags.name", + "driver": "$tags.driver", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "mv": bson.M{ "$avg": "$velocity" }, + }, + }}, + {{ + "$match", bson.M{ + "mv": bson.M{ "$gt": 1 }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$_id.fleet", + "name": "$_id.name", + "driver": "$_id.driver", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.bucket", + "unit": "day", + "binSize": 1, + }, + }, + }, + "ten_min_per_day": bson.M{ + "$count": bson.M{}, + }, + }, + }}, + {{ + "$addFields", bson.M{ + "hours_per_day": bson.M{ + "$divide": bson.A{ "$ten_min_per_day", 6 }, + }, + }, + }}, + } + + humanLabel := "MongoDB average driver driving duration per day" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From 9c3c9c68ad688ee54c3d8848296cbc8467431167 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 15:50:19 +0200 Subject: [PATCH 15/18] add function for MongoDB: AvgLoad --- .../databases/mongo/iot.go | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index d6e4f955f..bdc9846aa 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -523,3 +523,47 @@ func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// AvgDailyDrivingSession finds the average driving session without stopping per driver per day. +/* FIXME +func (i *IoT) AvgDailyDrivingSession(qi query.Query) { +} +*/ + +// AvgLoad finds the average load per truck model per fleet. +func (i *IoT) AvgLoad(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.model": bson.M{ "$ne" : nil}, + "tags.fleet": bson.M{ "$ne" : nil}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$tags.fleet", + "model": "$tags.model", + }, + "avg_load": bson.M{ "$avg": "$current_load"}, + "capacity": bson.M{ "$first": "$tags.load_capacity"}, + } + }}, + {{ + "$addFields", bson.M{ + "avg_load_ratio": bson.M{ + "$divide": bson.A{"$avg_load", "$capacity"}, + }, + }, + }} + } + humanLabel := "MongoDB average load per truck model per fleet" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} \ No newline at end of file From d93157153a24a967ba532a1932ce5eef334fc0c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Tue, 7 May 2024 16:05:16 +0200 Subject: [PATCH 16/18] add function for MongoDB: DailyTruckActivity --- .../databases/mongo/iot.go | 74 ++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index bdc9846aa..3fc098f20 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -548,7 +548,7 @@ func (i *IoT) AvgLoad(qi query.Query) { }, "avg_load": bson.M{ "$avg": "$current_load"}, "capacity": bson.M{ "$first": "$tags.load_capacity"}, - } + }, }}, {{ "$addFields", bson.M{ @@ -556,7 +556,7 @@ func (i *IoT) AvgLoad(qi query.Query) { "$divide": bson.A{"$avg_load", "$capacity"}, }, }, - }} + }}, } humanLabel := "MongoDB average load per truck model per fleet" humanDesc := humanLabel @@ -566,4 +566,72 @@ func (i *IoT) AvgLoad(qi query.Query) { q.Pipeline = pipelineQuery q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) -} \ No newline at end of file +} + +func (i *IoT) DailyTruckActivity(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.model": bson.M{"$ne": nil}, + "tags.fleet": bson.M{"$ne": nil}, + "tags.name": bson.M{"$ne": nil}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "fleet": "$tags.fleet", + "model": "$tags.model", + "ten_min_bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "mean_status": bson.M{ "$avg": "$status"}, + }, + }}, + {{ + "$match", bson.M{ + "mean_status": bson.M{ "$lt": 1.0 }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$_id.fleet", + "model": "$_id.model", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.ten_min_bucket", + "unit": "day", + "binSize": 1, + }, + }, + }, + "active_slots_per_day": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$addFields", bson.M{ + "daily_activity": bson.M{ + // in total, there are 144 10 minute slots per day + "$divide": bson.A{"$active_slots_per_day", 144 }, + }, + }, + }}, + } + + humanLabel := "MongoDB daily truck activity per fleet per model" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} From fcc1ddb5bae22436ba607d67a4eea01d6d65e5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Wed, 8 May 2024 15:15:39 +0200 Subject: [PATCH 17/18] add function for MongoDB: TruckBreakdownFrequency --- .../databases/mongo/iot.go | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index 3fc098f20..ceaff840b 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -635,3 +635,53 @@ func (i *IoT) DailyTruckActivity(qi query.Query) { q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) } + +// TruckBreakdownFrequency calculates the amount of times a truck model broke down in the last period. +func (i *IoT) TruckBreakdownFrequency(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.name": bson.M{"$ne": nil}, + "tags.model": bson.M{"$ne": nil}, + }, + }}, + {{ + "$setWindowFields": bson.M{ + "partitionBy": "$tags.name", + "sortBy": bson.M{ "time": 1 }, + "output": bson.M{ + "summed": bson.M{ + "$sum": "$status", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + }, + }, + }}, + {{ + "$match": bson.M{ + "status": bson.M{ "$ne": 0 }, + "$expr": bson.M{ + "$eq": bson.A{ "$status", "$summed" } + }, + }, + }}, + {{ + "$group": bson.M{ + "_id": "$tags.model", + "breakdowns": bson.M{"$count": bson.M{}}, + } + }}, + } + + humanLabel := "MongoDB truck breakdown frequency per model" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} \ No newline at end of file From 37ea28daf420816bf46c857d6dd7b8b47c5c095e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Domaschka?= Date: Wed, 8 May 2024 22:35:48 +0000 Subject: [PATCH 18/18] add function for MongoDB: AvgDailyDrivingSession --- .../databases/mongo/iot.go | 146 +++++++++++++++++- 1 file changed, 138 insertions(+), 8 deletions(-) diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go index ceaff840b..0a1476c43 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/iot.go +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -525,10 +525,140 @@ func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { } // AvgDailyDrivingSession finds the average driving session without stopping per driver per day. -/* FIXME func (i *IoT) AvgDailyDrivingSession(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.name": bson.M{"$ne": nil }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ "$avg" : "$velocity" }, + }, + }}, + {{ + "$addFields", bson.M{ + "isDriving": bson.M{ + "$cond": bson.A{ + bson.M{"$gte": bson.A{"$avg_velocity", 1.0}}, + 1.0, + 0.0, + }, + }, + }, + }}, + {{ + "$setWindowFields", bson.M{ + "partitionBy": "$_id.name", + "sortBy": bson.M{"_id.bucket": 1}, + "output": bson.M{ + "summedBack": bson.M{ + "$sum": "$isDriving", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + "summedFront": bson.M{ + "$sum": "$isDriving", + "window": bson.M{ + "documents": bson.A{"current", 1}, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "isDriving" : 1, + "$or": []bson.M{ + bson.M{"summedBack": bson.M{"$eq": 1}}, + bson.M{"summedFront" : bson.M{"$eq": 1}}, + }, + }, + }}, + {{ + "$setWindowFields", bson.M{ + "partitionBy": "$_id.name", + "sortBy": bson.M{"_id.bucket": 1}, + "output": bson.M{ + "times": bson.M{ + "$push": "$_id.bucket", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "summedFront": 1, + }, + }}, + {{ + "$addFields", bson.M{ + "interval": bson.M{ + "$cond": bson.A{ + bson.M{"$eq": bson.A{"$summedBack", 1} }, + 10, + bson.M{"$add": bson.A{ + bson.M{ + "$dateDiff": bson.M{ + "startDate": bson.M{ + "$arrayElemAt": bson.A{ "$times", 0 }, + }, + "endDate": bson.M{ + "$arrayElemAt": bson.A{ "$times", 1 }, + }, + "unit": "minute", + }, + }, + 10, + }}, + }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.bucket", + "unit": "hour", + "binSize": 1, + }, + }, + }, + "avgSession": bson.M{ + "$avg": "$interval", + }, + }, + }}, + } + humanLabel := "MongoDB average driver driving session without stopping per day" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) } -*/ // AvgLoad finds the average load per truck model per fleet. func (i *IoT) AvgLoad(qi query.Query) { @@ -647,7 +777,7 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { }, }}, {{ - "$setWindowFields": bson.M{ + "$setWindowFields", bson.M{ "partitionBy": "$tags.name", "sortBy": bson.M{ "time": 1 }, "output": bson.M{ @@ -661,18 +791,18 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { }, }}, {{ - "$match": bson.M{ + "$match", bson.M{ "status": bson.M{ "$ne": 0 }, "$expr": bson.M{ - "$eq": bson.A{ "$status", "$summed" } + "$eq": bson.A{ "$status", "$summed" }, }, }, }}, {{ - "$group": bson.M{ + "$group", bson.M{ "_id": "$tags.model", "breakdowns": bson.M{"$count": bson.M{}}, - } + }, }}, } @@ -684,4 +814,4 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { q.Pipeline = pipelineQuery q.CollectionName = []byte("point_data") q.HumanDescription = []byte(humanDesc) -} \ No newline at end of file +}