From 456e4be20413cbded3ba72de2394a2613711bff9 Mon Sep 17 00:00:00 2001 From: Marek Rusinowski Date: Sun, 16 Jul 2023 15:12:43 +0200 Subject: [PATCH] Implement Cloud PubSub Sink This sink allows publishing directly to Cloud Pub/Sub topic and not via the Cloud IoT Core MQTT bridge. --- .gitignore | 1 + README.md | 3 +- cmd/gbcsdpd/README.md | 4 +- go.mod | 5 +- go.sum | 6 ++ infra/main.tf | 25 +++++++- pkg/config/BUILD.bazel | 3 + pkg/config/config.go | 75 ++++++++++++++++++++++-- pkg/config/config_format.go | 29 ++++++++-- pkg/config/config_test.go | 27 ++++++++- pkg/config/testdata/test1/config.toml | 8 +++ pkg/config/testdata/test1/creds.json | 9 +++ pkg/sinks/BUILD.bazel | 3 + pkg/sinks/cloud_pubsub_sink.go | 82 +++++++++++++++++++++++++++ pkg/sinks/sinks.go | 2 + repositories.bzl | 24 +++++++- 16 files changed, 290 insertions(+), 16 deletions(-) create mode 100644 pkg/config/testdata/test1/creds.json create mode 100644 pkg/sinks/cloud_pubsub_sink.go diff --git a/.gitignore b/.gitignore index 4b0fb09..bad6c2f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /*.pem /bazel-* /config.toml +/*creds.json diff --git a/README.md b/README.md index 16a51a7..4058506 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ gbcsdpd is a simple Go Bluetooth Climate Sensor Data Publisher Daemon: it listens for BLE advertisements containing measurements from Bluetooth sensors, -parses them, and publishes via [MQTT](https://mqtt.org/) protocol. +parses them, and publishes via [MQTT](https://mqtt.org/) protocol or directly +to Cloud Pub/Sub. Currently, it supports only [RuuviTag](https://ruuvi.com/ruuvitag/) sensors but it should be easy to add support for more. diff --git a/cmd/gbcsdpd/README.md b/cmd/gbcsdpd/README.md index 0ad2529..e8ce05a 100644 --- a/cmd/gbcsdpd/README.md +++ b/cmd/gbcsdpd/README.md @@ -54,13 +54,15 @@ and you can start `gbcsdpd` and load it: `./gbcsdpd -config config.toml`. The only top-level setting is the Bluetooth adapter name and the rest of the configuration consists of a list of sinks to push publications to. There can be multiple sinks of the same and different types in the same configuration. There -are currently 3 types of sinks implemented: +are currently 4 types of sinks implemented: - Stdout: useful for debugging, prints measurements on stdout. - MQTT: generic MQTT target allowing to specify username, password, topic, format, etc. - GCP: MQTT sink which implements custom authorization scheme required by Cloud IoT. Internally it's a simple wrapper over generic MQTT implementation. +- Cloud Pub/Sub: sink directly pushing to Google Cloud Pub/Sub topic skipping + the Cloud IoT bridge. Data to MQTT servers is published as [gbcsdpd.api.v1.MeasurementsPublication](../../api/climate.proto) Protobuf diff --git a/go.mod b/go.mod index 899b4a3..4ee3498 100644 --- a/go.mod +++ b/go.mod @@ -4,20 +4,24 @@ go 1.20 require ( cloud.google.com/go/monitoring v1.15.1 + cloud.google.com/go/pubsub v1.32.0 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/fhmq/hmq v1.5.0 github.com/godbus/dbus/v5 v5.1.0 github.com/golang-jwt/jwt/v5 v5.0.0 github.com/google/go-cmp v0.5.9 github.com/pelletier/go-toml/v2 v2.0.9 + golang.org/x/oauth2 v0.10.0 google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 google.golang.org/grpc v1.56.2 google.golang.org/protobuf v1.31.0 ) require ( + cloud.google.com/go v0.110.4 // indirect cloud.google.com/go/compute v1.21.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v1.1.1 // indirect github.com/Shopify/sarama v1.38.1 // indirect github.com/bitly/go-simplejson v0.5.1 // indirect github.com/bytedance/sonic v1.9.2 // indirect @@ -71,7 +75,6 @@ require ( golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/net v0.12.0 // indirect - golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect diff --git a/go.sum b/go.sum index fa554f2..f189f80 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,18 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.110.4 h1:1JYyxKMN9hd5dR2MYTPWkGUgcoxVVhg0LKNKEo0qvmk= +cloud.google.com/go v0.110.4/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= cloud.google.com/go/compute v1.21.0 h1:JNBsyXVoOoNJtTQcnEY5uYpZIbeCTYIeDe0Xh1bySMk= cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= +cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= +cloud.google.com/go/kms v1.12.1 h1:xZmZuwy2cwzsocmKDOPu4BL7umg8QXagQx6fKVmf45U= cloud.google.com/go/monitoring v1.15.1 h1:65JhLMd+JiYnXr6j5Z63dUYCuOg770p8a/VC+gil/58= cloud.google.com/go/monitoring v1.15.1/go.mod h1:lADlSAlFdbqQuwwpaImhsJXu1QSdd3ojypXrFSMr2rM= +cloud.google.com/go/pubsub v1.32.0 h1:JOEkgEYBuUTHSyHS4TcqOFuWr+vD6qO/imsFqShUCp4= +cloud.google.com/go/pubsub v1.32.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= diff --git a/infra/main.tf b/infra/main.tf index 1a1f7f3..4a93d3d 100644 --- a/infra/main.tf +++ b/infra/main.tf @@ -63,6 +63,27 @@ resource "google_pubsub_topic" "measurements_topic" { depends_on = [google_project_service.service["pubsub"]] } +resource "google_service_account" "measurements-publisher" { + account_id = "measurements-publisher" + display_name = "Measurements Publisher" + description = "Service account used for pushing measurements via Pub/Sub" + + depends_on = [google_project_service.service["iam"], google_project_service.service["pubsub"]] +} + +data "google_iam_policy" "measurements_topic-policy" { + binding { + role = "roles/pubsub.publisher" + members = [google_service_account.measurements-publisher.member] + } +} + +resource "google_pubsub_topic_iam_policy" "measurements_topic-iam" { + project = google_pubsub_topic.measurements_topic.project + topic = google_pubsub_topic.measurements_topic.name + policy_data = data.google_iam_policy.measurements_topic-policy.policy_data +} + resource "google_cloudiot_registry" "sensors_registry" { name = "sensors" @@ -172,7 +193,7 @@ resource "google_service_account" "metricspusher" { resource "google_project_iam_member" "metricspusher-metrics-writer" { project = var.project role = "roles/monitoring.metricWriter" - member = "serviceAccount:${google_service_account.metricspusher.email}" + member = google_service_account.metricspusher.member } resource "google_cloud_run_service" "metricspusher-service" { @@ -209,7 +230,7 @@ resource "google_cloud_run_service" "metricspusher-service" { data "google_iam_policy" "metricspusher-service-policy" { binding { role = "roles/run.invoker" - members = ["serviceAccount:${google_service_account.metricspusher-invoker.email}"] + members = [google_service_account.metricspusher-invoker.member] } } diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index ba527c6..9279d03 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -11,6 +11,8 @@ go_library( deps = [ "@com_github_golang_jwt_jwt_v5//:go_default_library", "@com_github_pelletier_go_toml_v2//:go_default_library", + "@com_google_cloud_go_pubsub//:go_default_library", + "@org_golang_x_oauth2//google:go_default_library", ], ) @@ -23,5 +25,6 @@ go_test( "@com_github_golang_jwt_jwt_v5//:go_default_library", "@com_github_google_go_cmp//cmp:go_default_library", "@com_github_google_go_cmp//cmp/cmpopts:go_default_library", + "@org_golang_x_oauth2//google:go_default_library", ], ) diff --git a/pkg/config/config.go b/pkg/config/config.go index 3c24e08..9a4387d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,6 +15,7 @@ package config import ( + "context" "crypto/rsa" "crypto/tls" "crypto/x509" @@ -25,8 +26,10 @@ import ( "strings" "time" + "cloud.google.com/go/pubsub" "github.com/golang-jwt/jwt/v5" "github.com/pelletier/go-toml/v2" + "golang.org/x/oauth2/google" ) // Sink represents the configuration for Sink. @@ -74,6 +77,12 @@ type GCPSink struct { TLSConfig *tls.Config } +type CloudPubSubSink struct { + Name, Project, Topic, Device string + RateLimit *RateLimit + Creds *google.Credentials +} + // StdoutSink is configuration for sink.StdoutSink. type StdoutSink struct { Name string @@ -81,12 +90,13 @@ type StdoutSink struct { } var ( - projectIDRE, registryDeviceIDsRE, cloudIoTRegionRE, clientIDRE *regexp.Regexp + projectIDRE, registryDeviceIDsRE, cloudPubSubTopicRE, cloudIoTRegionRE, clientIDRE *regexp.Regexp ) func init() { projectIDRE = regexp.MustCompile(`[-a-z0-9]{6,30}`) registryDeviceIDsRE = regexp.MustCompile(`[a-zA-Z][-a-zA-Z0-9._+~%]{2,254}`) + cloudPubSubTopicRE = regexp.MustCompile(`[a-zA-Z][-a-zA-Z0-9._+~%]{2,254}`) cloudIoTRegionRE = regexp.MustCompile(`us-central1|europe-west1|asia-east1`) clientIDRE = regexp.MustCompile(`[0-9a-zA-Z]{0,23}`) } @@ -262,6 +272,56 @@ func parseGCPSink(basePath string, sinkID int, sink *fGCPSink) (*GCPSink, error) return res, nil } +func parseCloudPubSubSink(basePath string, sinkID int, sink *fCloudPubSubSink) (*CloudPubSubSink, error) { + ctx := context.Background() + res := &CloudPubSubSink{} + if sink.Name == "" { + res.Name = fmt.Sprintf("unnamed-cloud_pubsub-sink-%d", sinkID) + } else { + res.Name = sink.Name + } + + if !registryDeviceIDsRE.MatchString(sink.Device) { + return nil, fmt.Errorf("sink %s: Device must match %s, given: '%s'", registryDeviceIDsRE.String(), res.Name, sink.Device) + } + res.Device = sink.Device + + if !cloudPubSubTopicRE.MatchString(sink.Topic) { + return nil, fmt.Errorf("sink %s: Topic must meet requirements in https://cloud.google.com/pubsub/docs/create-topic#resource_names, given: '%s'", res.Name, sink.Topic) + } + res.Topic = sink.Topic + + if sink.Creds == nil { + creds, err := google.FindDefaultCredentials(ctx) + if err != nil { + return nil, fmt.Errorf("sink %s: No credentials specified, and failed to find default credentials: %v", sink.Name, err) + } + res.Creds = creds + } else if credsBytes, err := ioutil.ReadFile(joinPathWithAbs(basePath, *sink.Creds)); err != nil { + return nil, fmt.Errorf("sink %s: Failed to read '%s' creds file: %v", sink.Name, *sink.Creds, err) + } else if creds, err := google.CredentialsFromJSON(ctx, credsBytes, "https://www.googleapis.com/auth/pubsub"); err != nil { + return nil, fmt.Errorf("sink %s: Failed to parse '%s' PEM key file: %v", sink.Name, *sink.Creds, err) + } else { + res.Creds = creds + } + + if sink.Project == nil { + res.Project = pubsub.DetectProjectID + } else if !projectIDRE.MatchString(*sink.Project) { + return nil, fmt.Errorf("sink %s: Project ID must meet requirements in https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin, given: '%s'", res.Name, *sink.Project) + } else { + res.Project = *sink.Project + } + + rateLimit, err := parseRateLimit(sink.RateLimit) + if err != nil { + return nil, fmt.Errorf("sink %s: Failed to parse rate limit: %v", sink.Name, err) + } + res.RateLimit = rateLimit + + return res, nil +} + func parseStdoutSink(sinkID int, sink *fStdoutSink) (*StdoutSink, error) { res := &StdoutSink{} if sink.Name == "" { @@ -305,21 +365,28 @@ func Read(configPath string) (*Config, error) { for i, sink := range fconfig.Sinks.MQTT { mqttSink, err := parseMQTTSink(path.Dir(configPath), i, sink) if err != nil { - return nil, fmt.Errorf("failed to parse MQTT sink conifg: %v", err) + return nil, fmt.Errorf("failed to parse MQTT sink config: %v", err) } config.Sinks = append(config.Sinks, mqttSink) } for i, sink := range fconfig.Sinks.GCP { gcpSink, err := parseGCPSink(path.Dir(configPath), i, sink) if err != nil { - return nil, fmt.Errorf("failed to parse GCP sink conifg: %v", err) + return nil, fmt.Errorf("failed to parse GCP sink config: %v", err) } config.Sinks = append(config.Sinks, gcpSink) } + for i, sink := range fconfig.Sinks.CloudPubSub { + cloudPubSubSink, err := parseCloudPubSubSink(path.Dir(configPath), i, sink) + if err != nil { + return nil, fmt.Errorf("failed to parse Cloud Pub/Sub sink config: %v", err) + } + config.Sinks = append(config.Sinks, cloudPubSubSink) + } for i, sink := range fconfig.Sinks.Stdout { stdoutSink, err := parseStdoutSink(i, sink) if err != nil { - return nil, fmt.Errorf("failed to parse stdout sink conifg: %v", err) + return nil, fmt.Errorf("failed to parse stdout sink config: %v", err) } config.Sinks = append(config.Sinks, stdoutSink) } diff --git a/pkg/config/config_format.go b/pkg/config/config_format.go index 645b492..12e872d 100644 --- a/pkg/config/config_format.go +++ b/pkg/config/config_format.go @@ -1,4 +1,4 @@ -// Copyright 2021 Google LLC +// Copyright 2021-2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,9 +25,10 @@ type fConfig struct { // Struct holds list of sinks for publications type fSinks struct { - MQTT []*fMQTTSink `toml:"mqtt"` - GCP []*fGCPSink `toml:"gcp"` - Stdout []*fStdoutSink `toml:"stdout"` + MQTT []*fMQTTSink `toml:"mqtt"` + GCP []*fGCPSink `toml:"gcp"` + CloudPubSub []*fCloudPubSubSink `toml:"cloud_pubsub"` + Stdout []*fStdoutSink `toml:"stdout"` } // Configruation for publishing to stdout @@ -106,6 +107,26 @@ type fGCPSink struct { TLS fTLSConfig `toml:"tls"` } +// Configuration for publishing to Google Cloud Pub/Sub +type fCloudPubSubSink struct { + // Optional name of sink + Name string `toml:"name"` + + RateLimit *fRateLimit `toml:"rate_limit"` + + // Device used to disambiguate multiple devices publishing to the same topic + Device string `toml:"device"` + + // Project Id + Project *string `toml:"project"` + + // Cloud Pub/Sub topic name + Topic string `toml:"topic"` + + // Path to service account credentials file + Creds *string `toml:"creds"` +} + // Configuration for publishing rate limitting type fRateLimit struct { // Specifies rate limit to publish max 1 publication in duration. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7344974..1f895e3 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -15,6 +15,7 @@ package config import ( + "context" "crypto/rsa" "crypto/tls" "crypto/x509" @@ -25,6 +26,7 @@ import ( "github.com/golang-jwt/jwt/v5" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "golang.org/x/oauth2/google" ) func readKey(t *testing.T, keyPath string) *rsa.PrivateKey { @@ -51,6 +53,18 @@ func readCACerts(t *testing.T, caCertsPath string) *x509.CertPool { return certpool } +func readGoogleCredentials(t *testing.T, credsPath string) *google.Credentials { + credsBytes, err := ioutil.ReadFile(credsPath) + if err != nil { + t.Fatalf("Failed to load file %s: %v", credsPath, err) + } + creds, err := google.CredentialsFromJSON(context.Background(), credsBytes) + if err != nil { + t.Fatalf("Failed to parse creds file %s: %v", credsPath, err) + } + return creds +} + func caCertsTrans(cp *x509.CertPool) [][]byte { if cp == nil { return [][]byte{} @@ -62,7 +76,10 @@ func cmpConfig(actual, expected *Config) string { return cmp.Diff(actual, expected, cmp.Transformer("CertPool", caCertsTrans), cmpopts.IgnoreUnexported(tls.Config{}), - cmpopts.IgnoreFields(tls.Config{}, "ClientSessionCache")) + cmpopts.IgnoreUnexported(google.Credentials{}), + cmpopts.IgnoreFields(tls.Config{}, "ClientSessionCache"), + cmpopts.IgnoreFields(google.Credentials{}, "TokenSource"), + ) } func TestParsingCorrect(t *testing.T) { @@ -103,6 +120,14 @@ func TestParsingCorrect(t *testing.T) { RootCAs: readCACerts(t, "testdata/test1/myCa.pem"), }, }, + &CloudPubSubSink{ + Name: "cloud pubsub sink 1", + Device: "device2", + Project: "project2", + Topic: "topic1", + Creds: readGoogleCredentials(t, "testdata/test1/creds.json"), + RateLimit: &RateLimit{Max1In: 120 * time.Second}, + }, &StdoutSink{ Name: "stdout sink 1", RateLimit: &RateLimit{Max1In: 90 * time.Second}, diff --git a/pkg/config/testdata/test1/config.toml b/pkg/config/testdata/test1/config.toml index 65182c7..2f84a6a 100644 --- a/pkg/config/testdata/test1/config.toml +++ b/pkg/config/testdata/test1/config.toml @@ -29,6 +29,14 @@ tls.ca_certs = "myCa.pem" tls.skip_verify = true tls.server_name = "tls_overriden.gcp.com" +[[sinks.cloud_pubsub]] +name = "cloud pubsub sink 1" +rate_limit.max_1_in = "120s" +device = "device2" +project = "project2" +topic = "topic1" +creds = "creds.json" + [[sinks.stdout]] name = "stdout sink 2" rate_limit.max_1_in = "10s" diff --git a/pkg/config/testdata/test1/creds.json b/pkg/config/testdata/test1/creds.json new file mode 100644 index 0000000..f08f67d --- /dev/null +++ b/pkg/config/testdata/test1/creds.json @@ -0,0 +1,9 @@ +{ + "type": "service_account", + "private_key_id": "abc", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n", + "client_email": "123-abc@developer.gserviceaccount.com", + "client_id": "123-abc.apps.googleusercontent.com", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "http://localhost:8080/token" +} \ No newline at end of file diff --git a/pkg/sinks/BUILD.bazel b/pkg/sinks/BUILD.bazel index 9043f90..610192a 100644 --- a/pkg/sinks/BUILD.bazel +++ b/pkg/sinks/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "cloud_pubsub_sink.go", "gcp_sink.go", "mqtt_sink.go", "ratelimiter.go", @@ -17,6 +18,8 @@ go_library( "//pkg/config:go_default_library", "@com_github_eclipse_paho_mqtt_golang//:go_default_library", "@com_github_golang_jwt_jwt_v5//:go_default_library", + "@com_google_cloud_go_pubsub//:go_default_library", + "@org_golang_google_api//option:go_default_library", "@org_golang_google_protobuf//encoding/protojson:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", ], diff --git a/pkg/sinks/cloud_pubsub_sink.go b/pkg/sinks/cloud_pubsub_sink.go new file mode 100644 index 0000000..e4bf9e4 --- /dev/null +++ b/pkg/sinks/cloud_pubsub_sink.go @@ -0,0 +1,82 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sinks + +import ( + "context" + "fmt" + "log" + "time" + + "cloud.google.com/go/pubsub" + api "github.com/p2004a/gbcsdpd/api" + "github.com/p2004a/gbcsdpd/pkg/config" + "google.golang.org/api/option" + "google.golang.org/protobuf/proto" +) + +// CloudPubSubSink publishes measurements to Google Cloud Pub/Sub. +type CloudPubSubSink struct { + config *config.CloudPubSubSink + client *pubsub.Client + topic *pubsub.Topic + rl *rateLimiter +} + +// Publish is used to push measurement for publication. +func (s *CloudPubSubSink) Publish(m *api.Measurement) { + s.rl.Publish(m) +} + +func (s *CloudPubSubSink) groupPublish(ms []*api.Measurement) { + ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) + defer cancel() + pub := &api.MeasurementsPublication{Measurements: ms} + serPub, err := proto.Marshal(pub) + if err != nil { + log.Fatalf("Failed to binary encode measurement: %v", err) + } + // The attributes are named for compatibility with the IoT Core way of publishing. + _, err = s.topic.Publish(context.Background(), &pubsub.Message{ + Data: serPub, + Attributes: map[string]string{ + "deviceId": s.config.Device, + "deviceRegistryLocation": "global", + "projectId": s.client.Project(), + "subFolder": "v1", + }, + }).Get(ctx) + if err != nil { + log.Printf("Failed to publish measurement: %v", err) + } +} + +// NewCloudPubSubSink creates new CloudPubSubSink. +func NewCloudPubSubSink(config *config.CloudPubSubSink) (*CloudPubSubSink, error) { + ctx := context.Background() + client, err := pubsub.NewClient(ctx, config.Project, option.WithCredentials(config.Creds)) + if err != nil { + return nil, fmt.Errorf("failed to create pubsub client: %v", err) + } + topic := client.Topic(config.Topic) + topic.PublishSettings.Timeout = 30 * time.Second + s := &CloudPubSubSink{ + config: config, + topic: topic, + client: client, + } + s.rl = newRateLimiter(config.RateLimit, s.groupPublish) + return s, nil +} diff --git a/pkg/sinks/sinks.go b/pkg/sinks/sinks.go index b956795..1399994 100644 --- a/pkg/sinks/sinks.go +++ b/pkg/sinks/sinks.go @@ -31,6 +31,8 @@ func NewSink(sinkConfig config.Sink) (Sink, error) { switch s := sinkConfig.(type) { case *config.GCPSink: return NewGCPSink(s) + case *config.CloudPubSubSink: + return NewCloudPubSubSink(s) case *config.StdoutSink: return NewStdoutSink(s) case *config.MQTTSink: diff --git a/repositories.bzl b/repositories.bzl index bc64275..c54acd3 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -285,6 +285,13 @@ def go_repositories(): sum = "h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=", version = "v1.0.0", ) + go_repository( + name = "com_github_google_martian_v3", + importpath = "github.com/google/martian/v3", + sum = "h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=", + version = "v3.3.2", + ) + go_repository( name = "com_github_google_s2a_go", importpath = "github.com/google/s2a-go", @@ -1195,6 +1202,13 @@ def go_repositories(): sum = "h1:KIV99afoYTJqA2qi8Cjbl5DpjSRzvqFgKcptGXg6kxw=", version = "v1.17.1", ) + go_repository( + name = "com_google_cloud_go_storage", + importpath = "cloud.google.com/go/storage", + sum = "h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/oNM=", + version = "v1.30.1", + ) + go_repository( name = "com_google_cloud_go_storagetransfer", importpath = "cloud.google.com/go/storagetransfer", @@ -1444,6 +1458,12 @@ def go_repositories(): sum = "h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=", version = "v0.11.0", ) + go_repository( + name = "org_golang_x_time", + importpath = "golang.org/x/time", + sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=", + version = "v0.3.0", + ) go_repository( name = "org_golang_x_tools", @@ -1454,8 +1474,8 @@ def go_repositories(): go_repository( name = "org_golang_x_xerrors", importpath = "golang.org/x/xerrors", - sum = "h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=", - version = "v0.0.0-20200804184101-5ec99f83aff1", + sum = "h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=", + version = "v0.0.0-20220907171357-04be3eba64a2", ) go_repository( name = "org_uber_go_atomic",