diff --git a/cmd/tsbs_generate_queries/databases/mongo/common.go b/cmd/tsbs_generate_queries/databases/mongo/common.go index 26a2c48b8..705f06486 100644 --- a/cmd/tsbs_generate_queries/databases/mongo/common.go +++ b/cmd/tsbs_generate_queries/databases/mongo/common.go @@ -4,11 +4,14 @@ import ( "time" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/finance" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" "github.com/timescale/tsbs/pkg/query" ) +const goTimeFmt = "2006-01-02 15:04:05.999999 -0700" + // BaseGenerator contains settings specific for Mongo database. type BaseGenerator struct { UseNaive bool @@ -54,3 +57,17 @@ func (g *BaseGenerator) NewFinance(start, end time.Time, scale int) (utils.Query Core: core, }, nil } + +// NewIoT creates a new iot use case query generator. +func (g *BaseGenerator) NewIoT(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := iot.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + return &IoT{ + BaseGenerator: g, + Core: core, + }, nil +} diff --git a/cmd/tsbs_generate_queries/databases/mongo/iot.go b/cmd/tsbs_generate_queries/databases/mongo/iot.go new file mode 100644 index 000000000..0a1476c43 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/mongo/iot.go @@ -0,0 +1,817 @@ +package mongo + +import ( + "fmt" +// "strings" + "time" + "encoding/gob" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" + "github.com/timescale/tsbs/pkg/query" +) + +func init() { + // needed for serializing the mongo query to gob + gob.Register([]interface{}{}) + gob.Register(map[string]interface{}{}) + gob.Register([]map[string]interface{}{}) + gob.Register(bson.M{}) + gob.Register(bson.D{}) + gob.Register([]bson.M{}) +} + +// tenMinutePeriods calculates the number of 10 minute periods that can fit in +// the time duration if we subtract the minutes specified by minutesPerHour value. +// E.g.: 4 hours - 5 minutes per hour = 3 hours and 40 minutes = 22 ten minute periods +func tenMinutePeriods(minutesPerHour float64, duration time.Duration) int { + durationMinutes := duration.Minutes() + leftover := minutesPerHour * duration.Hours() + return int((durationMinutes - leftover) / 10) +} + +func (i *IoT) getTrucksFilterArray(nTrucks int) []string { + names, err := i.GetRandomTrucks(nTrucks) + panicIfErr(err) + return names +} + +// IoT produces Mongo-specific queries for all the iot query types. +type IoT struct { + *iot.Core + *BaseGenerator +} + +// NewIoT makes an IoT object ready to generate Queries. +func NewIoT(start, end time.Time, scale int, g *BaseGenerator) *IoT { + c, err := iot.NewCore(start, end, scale) + databases.PanicIfErr(err) + return &IoT{ + Core: c, + BaseGenerator: g, + } +} + +func (i *IoT) LastLocByTruck(qi query.Query, nTrucks int) { + trucks := i.getTrucksFilterArray(nTrucks) + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "tags.name": bson.M{ + "$in": trucks, + }, + }, + + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "longitude": "$longitude", + "latitude": "$latitude", + "time": "$time", + }, + }, + }, + }, + }}, + } + + humanLabel := "MongoDB last location by specific truck(s)" + humanDesc := fmt.Sprintf("%s: random %4d trucks (%v)", humanLabel, nTrucks, trucks) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// LastLocPerTruck finds all the truck locations along with truck and driver names. +func (i *IoT) LastLocPerTruck(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "readings" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "longitude": "$longitude", + "latitude": "$latitude", + "time": "$time", + "driver": "$tags.driver", + }, + }, + }, + }, + }}, + } + + humanLabel := "MongoDB last location for each truck" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// TrucksWithLowFuel finds all trucks with low fuel (less than 10%). +func (i *IoT) TrucksWithLowFuel(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "diagnostics" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "driver": "$tags.driver", + "time": "$time", + "fleet": "$tags.fleet", + "fuel": "$fuel_state", + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "output.fuel": bson.M{ + "$lte": 0.1, + }, + }, + }}, + } + humanLabel := "MongoDB trucks with low fuel in a fleet" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +func (i *IoT) TrucksWithHighLoad(qi query.Query) { + fleet := i.GetRandomFleet() + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "$and": []bson.M{ + bson.M{ "tags.fleet": fleet }, + bson.M{ "tags.name": bson.M{ "$ne": nil } }, + bson.M{ "measurement": "diagnostics" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.name", + "output": bson.M{ + "$top": bson.M{ + "sortBy": bson.M{ "time" : -1}, + "output": bson.M{ + "driver": "$tags.driver", + "time": "$time", + "fleet": "$tags.fleet", + "current_load": "$current_load", + "load_capacity": "$tags.load_capacity", + "current_load_ratio": bson.M{ + "$divide": bson.A{"$current_load", "$tags.load_capacity"}, + }, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "output.current_load_ratio": bson.M{ + "$gte": 0.9, + }, + }, + }}, + } + humanLabel := "MongoDB trucks with high load in a fleet" + humanDesc := fmt.Sprintf("%s: fleet: (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// StationaryTrucks finds all trucks that have low average velocity in a time window. +func (i *IoT) StationaryTrucks(qi query.Query) { + interval := i.Interval.MustRandWindow(iot.StationaryDuration) + start := interval.Start() + // start := interval.Start().Format(goTimeFmt) + // end := interval.End().Format(goTimeFmt) + end := interval.End() + fleet := i.GetRandomFleet() + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$lt": 1.0}, + }, + }}, + } + + humanLabel := "MongoDB stationary trucks (trucks with low velocity)" + humanDesc := fmt.Sprintf("%s: (%s) in [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// TrucksWithLongDrivingSessions finds all trucks that have not stopped at least 20 mins in the last 4 hours. +func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { + fleet := i.GetRandomFleet() + interval := i.Interval.MustRandWindow(iot.LongDrivingSessionDuration) + start := interval.Start() + end := interval.End() + // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 5 mins per hour. + numIntervals := tenMinutePeriods(5, iot.LongDrivingSessionDuration) + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "tags.name": bson.M{ "$ne": nil }, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$gte": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "driver": "$_id.driver", + }, + "active_10_min_sessions": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$match", bson.M{ + "active_10_min_sessions": bson.M{"$gt": numIntervals}, + }, + }}, + } + + humanLabel := "MongoDB trucks with longer driving sessions" + humanDesc := fmt.Sprintf("%s: (%s) stopped less than 20 mins in 4 hour period [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { + fleet := i.GetRandomFleet() + interval := i.Interval.MustRandWindow(iot.DailyDrivingDuration) + start := interval.Start() + end := interval.End() + // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 5 mins per hour. + numIntervals := tenMinutePeriods(35, iot.DailyDrivingDuration) + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.fleet": fleet, + "tags.name": bson.M{ "$ne": nil }, + "time": bson.M{"$gte": start, "$lt": end }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "driver": "$tags.driver", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ + "$avg": "$velocity", + }, + }, + }}, + {{ + "$match", bson.M{ + "avg_velocity": bson.M{"$gte": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "driver": "$_id.driver", + }, + "active_10_min_sessions": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$match", bson.M{ + "active_10_min_sessions": bson.M{"$gt": numIntervals}, + }, + }}, + } + + humanLabel := "MongoDB trucks with longer daily sessions" + humanDesc := fmt.Sprintf("%s: in fleet (%s) drove more than 10hours in the last 24 hours [%v, %v]", humanLabel, fleet, start, end) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + + +// AvgVsProjectedFuelConsumption calculates average and projected fuel consumption per fleet. +func (i *IoT) AvgVsProjectedFuelConsumption(qi query.Query) { + fleet := i.GetRandomFleet() + + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.name": bson.M{ "$ne": nil }, + "tags.nominal_fuel_consumption": bson.M{ "$ne": nil }, + "velocity": bson.M{"$gt": 1.0}, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.fleet", + "mean_fuel_consumption": bson.M{ + "$avg": "$fuel_consumption", + }, + "nominal_fuel_consumption": bson.M{ + "$avg": "$tags.nominal_fuel_consumption", + }, + }, + }}, + } + + humanLabel := "MongoDB average vs projected fuel consumption per fleet" + humanDesc := fmt.Sprintf("%s: in fleet (%s)", humanLabel, fleet) + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// AvgDailyDrivingDuration finds the average driving duration per driver. +func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$tags.fleet", + "name": "$tags.name", + "driver": "$tags.driver", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "mv": bson.M{ "$avg": "$velocity" }, + }, + }}, + {{ + "$match", bson.M{ + "mv": bson.M{ "$gt": 1 }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$_id.fleet", + "name": "$_id.name", + "driver": "$_id.driver", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.bucket", + "unit": "day", + "binSize": 1, + }, + }, + }, + "ten_min_per_day": bson.M{ + "$count": bson.M{}, + }, + }, + }}, + {{ + "$addFields", bson.M{ + "hours_per_day": bson.M{ + "$divide": bson.A{ "$ten_min_per_day", 6 }, + }, + }, + }}, + } + + humanLabel := "MongoDB average driver driving duration per day" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// AvgDailyDrivingSession finds the average driving session without stopping per driver per day. +func (i *IoT) AvgDailyDrivingSession(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "readings", + "tags.name": bson.M{"$ne": nil }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "fleet": "$tags.fleet", + "bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "avg_velocity": bson.M{ "$avg" : "$velocity" }, + }, + }}, + {{ + "$addFields", bson.M{ + "isDriving": bson.M{ + "$cond": bson.A{ + bson.M{"$gte": bson.A{"$avg_velocity", 1.0}}, + 1.0, + 0.0, + }, + }, + }, + }}, + {{ + "$setWindowFields", bson.M{ + "partitionBy": "$_id.name", + "sortBy": bson.M{"_id.bucket": 1}, + "output": bson.M{ + "summedBack": bson.M{ + "$sum": "$isDriving", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + "summedFront": bson.M{ + "$sum": "$isDriving", + "window": bson.M{ + "documents": bson.A{"current", 1}, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "isDriving" : 1, + "$or": []bson.M{ + bson.M{"summedBack": bson.M{"$eq": 1}}, + bson.M{"summedFront" : bson.M{"$eq": 1}}, + }, + }, + }}, + {{ + "$setWindowFields", bson.M{ + "partitionBy": "$_id.name", + "sortBy": bson.M{"_id.bucket": 1}, + "output": bson.M{ + "times": bson.M{ + "$push": "$_id.bucket", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "summedFront": 1, + }, + }}, + {{ + "$addFields", bson.M{ + "interval": bson.M{ + "$cond": bson.A{ + bson.M{"$eq": bson.A{"$summedBack", 1} }, + 10, + bson.M{"$add": bson.A{ + bson.M{ + "$dateDiff": bson.M{ + "startDate": bson.M{ + "$arrayElemAt": bson.A{ "$times", 0 }, + }, + "endDate": bson.M{ + "$arrayElemAt": bson.A{ "$times", 1 }, + }, + "unit": "minute", + }, + }, + 10, + }}, + }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$_id.name", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.bucket", + "unit": "hour", + "binSize": 1, + }, + }, + }, + "avgSession": bson.M{ + "$avg": "$interval", + }, + }, + }}, + } + humanLabel := "MongoDB average driver driving session without stopping per day" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// AvgLoad finds the average load per truck model per fleet. +func (i *IoT) AvgLoad(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.model": bson.M{ "$ne" : nil}, + "tags.fleet": bson.M{ "$ne" : nil}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$tags.fleet", + "model": "$tags.model", + }, + "avg_load": bson.M{ "$avg": "$current_load"}, + "capacity": bson.M{ "$first": "$tags.load_capacity"}, + }, + }}, + {{ + "$addFields", bson.M{ + "avg_load_ratio": bson.M{ + "$divide": bson.A{"$avg_load", "$capacity"}, + }, + }, + }}, + } + humanLabel := "MongoDB average load per truck model per fleet" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +func (i *IoT) DailyTruckActivity(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.model": bson.M{"$ne": nil}, + "tags.fleet": bson.M{"$ne": nil}, + "tags.name": bson.M{"$ne": nil}, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "name": "$tags.name", + "fleet": "$tags.fleet", + "model": "$tags.model", + "ten_min_bucket": bson.M{ + "$dateTrunc": bson.M{ + "date": "$time", + "unit": "minute", + "binSize": 10, + }, + }, + }, + "mean_status": bson.M{ "$avg": "$status"}, + }, + }}, + {{ + "$match", bson.M{ + "mean_status": bson.M{ "$lt": 1.0 }, + }, + }}, + {{ + "$group", bson.M{ + "_id": bson.M{ + "fleet": "$_id.fleet", + "model": "$_id.model", + "day": bson.M{ + "$dateTrunc": bson.M{ + "date": "$_id.ten_min_bucket", + "unit": "day", + "binSize": 1, + }, + }, + }, + "active_slots_per_day": bson.M{"$count": bson.M{}}, + }, + }}, + {{ + "$addFields", bson.M{ + "daily_activity": bson.M{ + // in total, there are 144 10 minute slots per day + "$divide": bson.A{"$active_slots_per_day", 144 }, + }, + }, + }}, + } + + humanLabel := "MongoDB daily truck activity per fleet per model" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} + +// TruckBreakdownFrequency calculates the amount of times a truck model broke down in the last period. +func (i *IoT) TruckBreakdownFrequency(qi query.Query) { + pipelineQuery := mongo.Pipeline{ + {{ + "$match", bson.M{ + "measurement": "diagnostics", + "tags.name": bson.M{"$ne": nil}, + "tags.model": bson.M{"$ne": nil}, + }, + }}, + {{ + "$setWindowFields", bson.M{ + "partitionBy": "$tags.name", + "sortBy": bson.M{ "time": 1 }, + "output": bson.M{ + "summed": bson.M{ + "$sum": "$status", + "window": bson.M{ + "documents": bson.A{-1, "current"}, + }, + }, + }, + }, + }}, + {{ + "$match", bson.M{ + "status": bson.M{ "$ne": 0 }, + "$expr": bson.M{ + "$eq": bson.A{ "$status", "$summed" }, + }, + }, + }}, + {{ + "$group", bson.M{ + "_id": "$tags.model", + "breakdowns": bson.M{"$count": bson.M{}}, + }, + }}, + } + + humanLabel := "MongoDB truck breakdown frequency per model" + humanDesc := humanLabel + + q := qi.(*query.Mongo) + q.HumanLabel = []byte(humanLabel) + q.Pipeline = pipelineQuery + q.CollectionName = []byte("point_data") + q.HumanDescription = []byte(humanDesc) +} diff --git a/cmd/tsbs_load_influx/creator.go b/cmd/tsbs_load_influx/creator.go index 28fc9a6bb..1f7f8d12c 100644 --- a/cmd/tsbs_load_influx/creator.go +++ b/cmd/tsbs_load_influx/creator.go @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) listDatabases() ([]string, error) { + client := http.Client{} u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL) - resp, err := http.Get(u) + req, err := http.NewRequest("GET", u, nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) + if err != nil { return nil, fmt.Errorf("listDatabases error: %s", err.Error()) } @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) { } ret := []string{} - for _, nestedName := range listing.Results[0].Series[0].Values { - name := nestedName[0] - // the _internal database is skipped: - if name == "_internal" { - continue + if len(listing.Results) > 0 { + for _, nestedName := range listing.Results[0].Series[0].Values { + name := nestedName[0] + // the _internal database is skipped: + if name == "_internal" { + continue + } + ret = append(ret, name) } - ret = append(ret, name) } return ret, nil } func (d *dbCreator) RemoveOldDB(dbName string) error { u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName) - resp, err := http.Post(u, "text/plain", nil) + client := http.Client{} + req, err := http.NewRequest("POST", u, nil) + if authToken != "" { + req.Header = http.Header{ + "Content-Type": []string{"text/plain"}, + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("drop db error: %s", err.Error()) } @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error { u.RawQuery = v.Encode() req, err := http.NewRequest("GET", u.String(), nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } if err != nil { return err } diff --git a/cmd/tsbs_load_influx/http_writer.go b/cmd/tsbs_load_influx/http_writer.go index b56ae2d8e..a53ce989f 100644 --- a/cmd/tsbs_load_influx/http_writer.go +++ b/cmd/tsbs_load_influx/http_writer.go @@ -14,6 +14,7 @@ import ( const ( httpClientName = "tsbs_load_influx" headerContentEncoding = "Content-Encoding" + headerAuthorization = "Authorization" headerGzip = "gzip" ) @@ -65,13 +66,16 @@ var ( textPlain = []byte("text/plain") ) -func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { +func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) } + if authToken != "" { + req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken)) + } req.SetBody(body) } @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, body, isGzip) + w.initializeReq(req, body, isGzip, authToken) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) diff --git a/cmd/tsbs_load_influx/http_writer_test.go b/cmd/tsbs_load_influx/http_writer_test.go index 170ae4ea3..ba27656c2 100644 --- a/cmd/tsbs_load_influx/http_writer_test.go +++ b/cmd/tsbs_load_influx/http_writer_test.go @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { defer fasthttp.ReleaseRequest(req) w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") if got := string(req.Body()); got != body { t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body) @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got) } - w.initializeReq(req, []byte(body), true) + w.initializeReq(req, []byte(body), true, "") if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip { t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip) } @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" normalURL := w.url // save for later modification - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) lat, err := w.executeReq(req, resp) @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err != errBackoff { t.Errorf("unexpected error response received (not backoff error): %v", err) @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err == nil { t.Errorf("unexpected non-error response received") diff --git a/cmd/tsbs_load_influx/main.go b/cmd/tsbs_load_influx/main.go index f6b85f1b5..268ffa95c 100644 --- a/cmd/tsbs_load_influx/main.go +++ b/cmd/tsbs_load_influx/main.go @@ -30,6 +30,9 @@ var ( useGzip bool doAbortOnExist bool consistency string + authToken string // InfluxDB v2 + bucketId string // InfluxDB v2 + orgId string // InfluxDB v2 ) // Global vars @@ -73,6 +76,8 @@ func init() { csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") + authToken = viper.GetString("auth-token") + orgId = viper.GetString("org") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") @@ -80,6 +85,12 @@ func init() { log.Fatalf("invalid consistency settings") } + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") diff --git a/cmd/tsbs_load_mongo/aggregate_loader.go b/cmd/tsbs_load_mongo/aggregate_loader.go index ca1b1682f..fe6e607a0 100644 --- a/cmd/tsbs_load_mongo/aggregate_loader.go +++ b/cmd/tsbs_load_mongo/aggregate_loader.go @@ -29,10 +29,13 @@ func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { p.Tags(t, j) key := string(t.Key()) if key == "hostname" || key == "name" { + myMap := map[string]interface{}{} + success := parseTag(myMap, t) + if( !success ) { continue } // the hostame is the defacto index for devops tags // the truck name is the defacto index for iot tags h := fnv.New32a() - h.Write([]byte(string(t.Value()))) + h.Write([]byte(myMap[key].(string))) return uint(h.Sum32()) % i.partitions } } @@ -128,12 +131,15 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6 eventCnt := uint64(0) for _, event := range batch.arr { tagsSlice := bson.D{} - tagsMap := map[string]string{} + tagsMap := map[string]interface{}{} t := &tsbsMongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - tagsMap[string(t.Key())] = string(t.Value()) - tagsSlice = append(tagsSlice, bson.E{string(t.Key()), string(t.Value())}) + parseTag(tagsMap, t) + bbson, success := parseTagAsBson(t) + if(success == true) { + tagsSlice = append(tagsSlice, bbson) + } } // Determine which document this event belongs too diff --git a/cmd/tsbs_load_mongo/common_loader.go b/cmd/tsbs_load_mongo/common_loader.go index 6fc21019c..3e22b5a53 100644 --- a/cmd/tsbs_load_mongo/common_loader.go +++ b/cmd/tsbs_load_mongo/common_loader.go @@ -13,12 +13,45 @@ import ( "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" "github.com/timescale/tsbs/pkg/targets/mongo" + "go.mongodb.org/mongo-driver/bson" ) type fileDataSource struct { lenBuf []byte r *bufio.Reader } +func parseTag(x map[string]interface{}, t *mongo.MongoTag) (bool) { + unionTable := new(flatbuffers.Table) + if(!t.Value(unionTable)) { return false } + if(t.ValueType() == mongo.MongoTagValueMongoStringTag) { + stringTag := new(mongo.MongoStringTag) + stringTag.Init(unionTable.Bytes, unionTable.Pos) + x[string(t.Key())] = string(stringTag.Value()) + } else if(t.ValueType() == mongo.MongoTagValueMongoFloat32Tag) { + floatTag := new(mongo.MongoFloat32Tag) + floatTag.Init(unionTable.Bytes, unionTable.Pos) + x[string(t.Key())] = float32(floatTag.Value()) + } else { + return false + } + return true +} + +func parseTagAsBson(t *mongo.MongoTag) (bson.E, bool) { + unionTable := new(flatbuffers.Table) + if(!t.Value(unionTable)) { return bson.E{}, false } + if(t.ValueType() == mongo.MongoTagValueMongoStringTag) { + stringTag := new(mongo.MongoStringTag) + stringTag.Init(unionTable.Bytes, unionTable.Pos) + return bson.E{string(t.Key()), string(stringTag.Value())}, true + } else if(t.ValueType() == mongo.MongoTagValueMongoFloat32Tag) { + floatTag := new(mongo.MongoFloat32Tag) + floatTag.Init(unionTable.Bytes, unionTable.Pos) + return bson.E{string(t.Key()), float32(floatTag.Value())}, true + } else { + return bson.E{}, false + } +} func (d *fileDataSource) NextItem() data.LoadedPoint { item := &mongo.MongoPoint{} diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index ab6fc99b2..cb0eae645 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -67,7 +67,7 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin x := spPool.Get().(*singlePoint) (*x)["measurement"] = string(event.MeasurementName()) (*x)[timestampField] = time.Unix(0, event.Timestamp()) - (*x)["tags"] = map[string]string{} + (*x)["tags"] = map[string]interface{}{} f := &tsbsMongo.MongoReading{} for j := 0; j < event.FieldsLength(); j++ { event.Fields(f, j) @@ -76,7 +76,7 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin t := &tsbsMongo.MongoTag{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - (*x)["tags"].(map[string]string)[string(t.Key())] = string(t.Value()) + parseTag((*x)["tags"].(map[string]interface{}), t) } p.pvs[i] = x metricCnt += uint64(event.FieldsLength()) @@ -95,7 +95,8 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin tags := bson.D{} for j := 0; j < event.TagsLength(); j++ { event.Tags(t, j) - tags = append(tags, bson.E{string(t.Key()), string(t.Value())}) + bbson, success := parseTagAsBson(t) + if(success == true) { tags = append(tags, bbson) } } x = append(x, bson.E{"tags", tags}) p.pvs[i] = x diff --git a/cmd/tsbs_run_queries_influx/http_client.go b/cmd/tsbs_run_queries_influx/http_client.go index 24b7b4827..fbfd1b33b 100644 --- a/cmd/tsbs_run_queries_influx/http_client.go +++ b/cmd/tsbs_run_queries_influx/http_client.go @@ -14,6 +14,7 @@ import ( ) var bytesSlash = []byte("/") // heap optimization +var headerAuthorization = "Authorization" // HTTPClient is a reusable HTTP Client. type HTTPClient struct { @@ -22,6 +23,7 @@ type HTTPClient struct { Host []byte HostString string uri []byte + authToken string } // HTTPClientDoOptions wraps options uses when calling `Do`. @@ -46,12 +48,17 @@ func getHttpClient() *http.Client { } // NewHTTPClient creates a new HTTPClient. -func NewHTTPClient(host string) *HTTPClient { +func NewHTTPClient(host string, authToken string) *HTTPClient { + token := "" + if authToken != "" { + token = fmt.Sprintf("Token %s", authToken) + } return &HTTPClient{ client: getHttpClient(), Host: []byte(host), HostString: host, uri: []byte{}, // heap optimization + authToken: token, } } @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if err != nil { panic(err) } - + if w.authToken != "" { + req.Header.Add(headerAuthorization, w.authToken) + } // Perform the request while tracking latency: start := time.Now() resp, err := w.client.Do(req) diff --git a/cmd/tsbs_run_queries_influx/main.go b/cmd/tsbs_run_queries_influx/main.go index 48a84d757..8e96cb83f 100644 --- a/cmd/tsbs_run_queries_influx/main.go +++ b/cmd/tsbs_run_queries_influx/main.go @@ -20,6 +20,7 @@ import ( var ( daemonUrls []string chunkSize uint64 + authToken string ) // Global vars: @@ -35,6 +36,7 @@ func init() { pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") + pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") pflag.Parse() @@ -49,8 +51,13 @@ func init() { } csvDaemonUrls = viper.GetString("urls") + authToken = viper.GetString("auth-token") chunkSize = viper.GetUint64("chunk-response-size") - + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } daemonUrls = strings.Split(csvDaemonUrls, ",") if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) { database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] - p.w = NewHTTPClient(url) + p.w = NewHTTPClient(url, authToken) } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { diff --git a/docs/influx.md b/docs/influx.md index e7f27edba..fefe67e00 100644 --- a/docs/influx.md +++ b/docs/influx.md @@ -7,6 +7,33 @@ using the data importer (`tsbs_load_influx`), and additional flags available for the query runner (`tsbs_run_queries_influx`). **This should be read *after* the main README.** +## Setup steps InfluxDB v2 + +If on a new setup run the following command: + +```bash +influx setup +``` + +If you need to create a new bucket adjust the bucket name (`-n`) and the org name (`-o`) accordingly: + +```bash +influx bucket create -n bucket-perf -o org -r 0 +``` + +Create a DBRP mapping with the InfluxDB 1.x compatibility API ([official docs](https://docs.influxdata.com/influxdb/cloud/reference/cli/influx/v1/dbrp/create/)). + +Adjust bucket name and db accordingly: + +```bash +influx v1 dbrp create --db benchmark --rp 0 --bucket-id `influx bucket ls --name bucket-perf | awk -v i=2 -v j=1 'FNR == i {print $j}'` --default +``` + +Retrieve the auth token as follows: +```bash +influx auth list +``` + ## Data format Data generated by `tsbs_generate_data` for InfluxDB is serialized in a @@ -58,6 +85,11 @@ Whether to encode writes to the server with gzip. For best performance, encoding with gzip is the best choice, but if the server does not support or has gzip disabled, this flag should be set to false. +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. + --- ## `tsbs_run_queries_influx` Additional Flags @@ -76,3 +108,10 @@ everything in a single response. Comma-separated list of URLs to connect to for querying. Workers will be distributed in a round robin fashion across the URLs. + +### Miscellaneous + +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. diff --git a/pkg/targets/influx/implemented_target.go b/pkg/targets/influx/implemented_target.go index 736a37182..17cb88ba5 100644 --- a/pkg/targets/influx/implemented_target.go +++ b/pkg/targets/influx/implemented_target.go @@ -21,6 +21,8 @@ func (t *influxTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Fla flagSet.String(flagPrefix+"urls", "http://localhost:8086", "InfluxDB URLs, comma-separated. Will be used in a round-robin fashion.") flagSet.Int(flagPrefix+"replication-factor", 1, "Cluster replication factor (only applies to clustered databases).") flagSet.String(flagPrefix+"consistency", "all", "Write consistency. Must be one of: any, one, quorum, all.") + flagSet.String(flagPrefix+"auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") + flagSet.String(flagPrefix+"organization", "", "Organization name (InfluxDB v2).") flagSet.Duration(flagPrefix+"backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.") flagSet.Bool(flagPrefix+"gzip", true, "Whether to gzip encode requests (default true).") } diff --git a/pkg/targets/mongo/MongoFloat32Tag.go b/pkg/targets/mongo/MongoFloat32Tag.go new file mode 100644 index 000000000..1eb0e9091 --- /dev/null +++ b/pkg/targets/mongo/MongoFloat32Tag.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MongoFloat32Tag struct { + _tab flatbuffers.Table +} + +func GetRootAsMongoFloat32Tag(buf []byte, offset flatbuffers.UOffsetT) *MongoFloat32Tag { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MongoFloat32Tag{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMongoFloat32Tag(buf []byte, offset flatbuffers.UOffsetT) *MongoFloat32Tag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoFloat32Tag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MongoFloat32Tag) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MongoFloat32Tag) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MongoFloat32Tag) Value() float32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetFloat32(o + rcv._tab.Pos) + } + return 0.0 +} + +func (rcv *MongoFloat32Tag) MutateValue(n float32) bool { + return rcv._tab.MutateFloat32Slot(4, n) +} + +func MongoFloat32TagStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MongoFloat32TagAddValue(builder *flatbuffers.Builder, value float32) { + builder.PrependFloat32Slot(0, value, 0.0) +} +func MongoFloat32TagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/targets/mongo/MongoPoint.go b/pkg/targets/mongo/MongoPoint.go index c71c4caa8..dd67019eb 100644 --- a/pkg/targets/mongo/MongoPoint.go +++ b/pkg/targets/mongo/MongoPoint.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoPoint(buf []byte, offset flatbuffers.UOffsetT) *MongoPoint { return x } +func GetSizePrefixedRootAsMongoPoint(buf []byte, offset flatbuffers.UOffsetT) *MongoPoint { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoPoint{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoPoint) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i diff --git a/pkg/targets/mongo/MongoReading.go b/pkg/targets/mongo/MongoReading.go index 0f8bcff85..403ef2b93 100644 --- a/pkg/targets/mongo/MongoReading.go +++ b/pkg/targets/mongo/MongoReading.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoReading(buf []byte, offset flatbuffers.UOffsetT) *MongoReadin return x } +func GetSizePrefixedRootAsMongoReading(buf []byte, offset flatbuffers.UOffsetT) *MongoReading { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoReading{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoReading) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i diff --git a/pkg/targets/mongo/MongoStringTag.go b/pkg/targets/mongo/MongoStringTag.go new file mode 100644 index 000000000..d086ad714 --- /dev/null +++ b/pkg/targets/mongo/MongoStringTag.go @@ -0,0 +1,52 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MongoStringTag struct { + _tab flatbuffers.Table +} + +func GetRootAsMongoStringTag(buf []byte, offset flatbuffers.UOffsetT) *MongoStringTag { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MongoStringTag{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMongoStringTag(buf []byte, offset flatbuffers.UOffsetT) *MongoStringTag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoStringTag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MongoStringTag) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MongoStringTag) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MongoStringTag) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func MongoStringTagStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MongoStringTagAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(value), 0) +} +func MongoStringTagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/targets/mongo/MongoTag.go b/pkg/targets/mongo/MongoTag.go index bdf37ef0a..7817f11f2 100644 --- a/pkg/targets/mongo/MongoTag.go +++ b/pkg/targets/mongo/MongoTag.go @@ -1,4 +1,4 @@ -// automatically generated by the FlatBuffers compiler, do not modify +// Code generated by the FlatBuffers compiler. DO NOT EDIT. package mongo @@ -17,6 +17,13 @@ func GetRootAsMongoTag(buf []byte, offset flatbuffers.UOffsetT) *MongoTag { return x } +func GetSizePrefixedRootAsMongoTag(buf []byte, offset flatbuffers.UOffsetT) *MongoTag { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MongoTag{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + func (rcv *MongoTag) Init(buf []byte, i flatbuffers.UOffsetT) { rcv._tab.Bytes = buf rcv._tab.Pos = i @@ -34,22 +41,38 @@ func (rcv *MongoTag) Key() []byte { return nil } -func (rcv *MongoTag) Value() []byte { +func (rcv *MongoTag) ValueType() MongoTagValue { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) + return MongoTagValue(rcv._tab.GetByte(o + rcv._tab.Pos)) } - return nil + return 0 +} + +func (rcv *MongoTag) MutateValueType(n MongoTagValue) bool { + return rcv._tab.MutateByteSlot(6, byte(n)) +} + +func (rcv *MongoTag) Value(obj *flatbuffers.Table) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + rcv._tab.Union(obj, o) + return true + } + return false } func MongoTagStart(builder *flatbuffers.Builder) { - builder.StartObject(2) + builder.StartObject(3) } func MongoTagAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) } +func MongoTagAddValueType(builder *flatbuffers.Builder, valueType MongoTagValue) { + builder.PrependByteSlot(1, byte(valueType), 0) +} func MongoTagAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(value), 0) } func MongoTagEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/pkg/targets/mongo/MongoTagValue.go b/pkg/targets/mongo/MongoTagValue.go new file mode 100644 index 000000000..8be0f81d9 --- /dev/null +++ b/pkg/targets/mongo/MongoTagValue.go @@ -0,0 +1,32 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package mongo + +import "strconv" + +type MongoTagValue byte + +const ( + MongoTagValueNONE MongoTagValue = 0 + MongoTagValueMongoStringTag MongoTagValue = 1 + MongoTagValueMongoFloat32Tag MongoTagValue = 2 +) + +var EnumNamesMongoTagValue = map[MongoTagValue]string{ + MongoTagValueNONE: "NONE", + MongoTagValueMongoStringTag: "MongoStringTag", + MongoTagValueMongoFloat32Tag: "MongoFloat32Tag", +} + +var EnumValuesMongoTagValue = map[string]MongoTagValue{ + "NONE": MongoTagValueNONE, + "MongoStringTag": MongoTagValueMongoStringTag, + "MongoFloat32Tag": MongoTagValueMongoFloat32Tag, +} + +func (v MongoTagValue) String() string { + if s, ok := EnumNamesMongoTagValue[v]; ok { + return s + } + return "MongoTagValue(" + strconv.FormatInt(int64(v), 10) + ")" +} diff --git a/pkg/targets/mongo/mongo.fbs b/pkg/targets/mongo/mongo.fbs index ca806091e..64dcb75ae 100644 --- a/pkg/targets/mongo/mongo.fbs +++ b/pkg/targets/mongo/mongo.fbs @@ -1,8 +1,22 @@ // mongo.fbs -namespace serialize; +// namespace serialize; +namespace mongo; + +table MongoStringTag { + value: string; +} + +table MongoFloat32Tag { + value: float; +} + +union MongoTagValue { + MongoStringTag, MongoFloat32Tag +} + table MongoTag { key:string; - value:string; + value: MongoTagValue; } table MongoReading { diff --git a/pkg/targets/mongo/serializer.go b/pkg/targets/mongo/serializer.go index b1153d7db..7307e4552 100644 --- a/pkg/targets/mongo/serializer.go +++ b/pkg/targets/mongo/serializer.go @@ -6,8 +6,8 @@ import ( "github.com/timescale/tsbs/pkg/data" "io" "sync" - flatbuffers "github.com/google/flatbuffers/go" + "os" ) var fbBuilderPool = &sync.Pool{ @@ -30,20 +30,35 @@ func (s *Serializer) Serialize(p *data.Point, w io.Writer) (err error) { tagKeys := p.TagKeys() tagValues := p.TagValues() for i := len(tagKeys); i > 0; i-- { + var tagType MongoTagValue + var tagElement flatbuffers.UOffsetT switch v := tagValues[i-1].(type) { case string: - k := string(tagKeys[i-1]) - key := b.CreateString(k) val := b.CreateString(v) - MongoTagStart(b) - MongoTagAddKey(b, key) - MongoTagAddValue(b, val) - tags = append(tags, MongoTagEnd(b)) + MongoStringTagStart(b) + MongoStringTagAddValue(b, val) + tagType = MongoTagValueMongoStringTag + tagElement = MongoStringTagEnd(b) case nil: continue + case float32: + vv := tagValues[i-1] + MongoFloat32TagStart(b) + MongoFloat32TagAddValue(b, vv.(float32)) + tagType = MongoTagValueMongoFloat32Tag + tagElement = MongoFloat32TagEnd(b) default: - panic("non-string tags not implemented for mongo db") + fmt.Fprintf(os.Stderr, "non-string tags not implemented for mongo db: %s\n", v) + // continue + panic("non-string, non-float tags not implemented for mongo db") } + k := string(tagKeys[i-1]) + key := b.CreateString(k) + MongoTagStart(b) + MongoTagAddKey(b, key) + MongoTagAddValueType(b, tagType) + MongoTagAddValue(b, tagElement) + tags = append(tags, MongoTagEnd(b)) } MongoPointStartTagsVector(b, len(tags)) for _, t := range tags { diff --git a/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh new file mode 100755 index 000000000..6c1cd8701 --- /dev/null +++ b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# showcases the ftsb 3 phases for influxdb +# - 1) data and query generation +# - 2) data loading/insertion +# - 3) query execution + +SCALE=${SCALE:-"10"} +SEED=${SEED:-"123"} +FORMAT="influx" + +mkdir -p /tmp/bulk_data +rm /tmp/bulk_data/${FORMAT}_* + +# exit immediately on error +set -e + +# Load parameters - common +DATABASE_PORT=${DATABASE_PORT:-8086} +DATABASE_HOST=${DATABASE_HOST:-localhost} + +# All available query types (sorted alphabetically) +QUERY_TYPES_ALL="\ + cpu-max-all-1 \ + cpu-max-all-8 \ + double-groupby-1 \ + double-groupby-5 \ + double-groupby-all \ + groupby-orderby-limit \ + high-cpu-1 \ + high-cpu-all \ + lastpoint \ + single-groupby-1-1-1 \ + single-groupby-1-1-12 \ + single-groupby-1-8-1 \ + single-groupby-5-1-1 \ + single-groupby-5-1-12 \ + single-groupby-5-8-1" + +# What query types to generate +QUERY_TYPES=${QUERY_TYPES:-$QUERY_TYPES_ALL} + +# generate data +$GOPATH/bin/tsbs_generate_data --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} --file /tmp/bulk_data/${FORMAT}_data + +for queryName in $QUERY_TYPES; do + echo "generating query: $queryName" + $GOPATH/bin/tsbs_generate_queries --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} \ + --queries=10 \ + --query-type $queryName \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Remove previous database +curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20benchmark + +# insert benchmark +$GOPATH/bin/tsbs_load_${FORMAT} \ + --db-name=benchmark \ + --backoff=1s \ + --workers=1 \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file=/tmp/bulk_data/${FORMAT}_data + +# queries benchmark +for queryName in $QUERY_TYPES; do + echo "running query: $queryName" + $GOPATH/bin/tsbs_run_queries_${FORMAT} --print-responses \ + --workers=1 \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done diff --git a/scripts/load/load_influx.sh b/scripts/load/load_influx.sh index 90e9e13c0..f8c6774d9 100755 --- a/scripts/load/load_influx.sh +++ b/scripts/load/load_influx.sh @@ -10,6 +10,7 @@ fi # Load parameters - common DATA_FILE_NAME=${DATA_FILE_NAME:-influx-data.gz} DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} EXE_DIR=${EXE_DIR:-$(dirname $0)} source ${EXE_DIR}/load_common.sh @@ -20,7 +21,10 @@ until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do done # Remove previous database -curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} +curl --header "Authorization: Token $INFLUX_AUTH_TOKEN" \ + -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} + + # Load new data cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --db-name=${DATABASE_NAME} \ @@ -28,4 +32,5 @@ cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --workers=${NUM_WORKERS} \ --batch-size=${BATCH_SIZE} \ --reporting-period=${REPORTING_PERIOD} \ + --auth-token $INFLUX_AUTH_TOKEN \ --urls=http://${DATABASE_HOST}:${DATABASE_PORT} diff --git a/scripts/run_queries/run_queries_influx.sh b/scripts/run_queries/run_queries_influx.sh index 749902e63..23c96554e 100755 --- a/scripts/run_queries/run_queries_influx.sh +++ b/scripts/run_queries/run_queries_influx.sh @@ -7,17 +7,25 @@ if [[ -z "$EXE_FILE_NAME" ]]; then exit 1 fi -# Default queries folder -BULK_DATA_DIR=${BULK_DATA_DIR:-"/tmp/bulk_queries"} -MAX_QUERIES=${MAX_QUERIES:-"0"} -# How many concurrent worker would run queries - match num of cores, or default to 4 -NUM_WORKERS=${NUM_WORKERS:-$(grep -c ^processor /proc/cpuinfo 2> /dev/null || echo 4)} +DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} + +EXE_DIR=${EXE_DIR:-$(dirname $0)} +source ${EXE_DIR}/run_common.sh + + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Ensure RESULTS DIR available +mkdir -p ${RESULTS_DIR} # # Run test for one file # -function run_file() -{ +function run_file() { # $FULL_DATA_FILE_NAME: /full/path/to/file_with.ext # $DATA_FILE_NAME: file_with.ext # $DIR: /full/path/to @@ -29,24 +37,36 @@ function run_file() EXTENSION="${DATA_FILE_NAME##*.}" NO_EXT_DATA_FILE_NAME="${DATA_FILE_NAME%.*}" - # Several options on how to name results file - #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" - OUT_FULL_FILE_NAME="${DIR}/result_${NO_EXT_DATA_FILE_NAME}.out" - #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" - if [ "${EXTENSION}" == "gz" ]; then GUNZIP="gunzip" else GUNZIP="cat" fi - echo "Running ${DATA_FILE_NAME}" - cat $FULL_DATA_FILE_NAME \ - | $GUNZIP \ - | $EXE_FILE_NAME \ - --max-queries $MAX_QUERIES \ - --workers $NUM_WORKERS \ - | tee $OUT_FULL_FILE_NAME + for run in $(seq ${REPETITIONS}); do + # Several options on how to name results file + #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" + OUT_FULL_FILE_NAME="${RESULTS_DIR}/result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" + HDR_FULL_FILE_NAME="${RESULTS_DIR}/HDR_TXT_result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + + echo "Running ${DATA_FILE_NAME}" + echo " Saving results to ${OUT_FULL_FILE_NAME}" + echo " Saving HDR results to ${HDR_FULL_FILE_NAME}" + + cat $FULL_DATA_FILE_NAME | + $GUNZIP | + $EXE_FILE_NAME \ + --max-queries=${MAX_QUERIES} \ + --db-name=${DATABASE_NAME} \ + --workers=${NUM_WORKERS} \ + --print-interval=${QUERIES_PRINT_INTERVAL} \ + --hdr-latencies=${HDR_FULL_FILE_NAME} \ + --auth-token $INFLUX_AUTH_TOKEN \ + --debug=${DEBUG} \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} | + tee $OUT_FULL_FILE_NAME + done } if [ "$#" -gt 0 ]; then