Skip to content

Commit

Permalink
Implement Cloud PubSub Sink
Browse files Browse the repository at this point in the history
This sink allows publishing directly to Cloud Pub/Sub topic and not
via the Cloud IoT Core MQTT bridge.
  • Loading branch information
p2004a committed Jul 16, 2023
1 parent 4c199b6 commit 456e4be
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*.pem
/bazel-*
/config.toml
/*creds.json
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

gbcsdpd is a simple Go Bluetooth Climate Sensor Data Publisher Daemon: it
listens for BLE advertisements containing measurements from Bluetooth sensors,
parses them, and publishes via [MQTT](https://mqtt.org/) protocol.
parses them, and publishes via [MQTT](https://mqtt.org/) protocol or directly
to Cloud Pub/Sub.

Currently, it supports only [RuuviTag](https://ruuvi.com/ruuvitag/) sensors but
it should be easy to add support for more.
Expand Down
4 changes: 3 additions & 1 deletion cmd/gbcsdpd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ and you can start `gbcsdpd` and load it: `./gbcsdpd -config config.toml`.
The only top-level setting is the Bluetooth adapter name and the rest of the
configuration consists of a list of sinks to push publications to. There can be
multiple sinks of the same and different types in the same configuration. There
are currently 3 types of sinks implemented:
are currently 4 types of sinks implemented:

- Stdout: useful for debugging, prints measurements on stdout.
- MQTT: generic MQTT target allowing to specify username, password, topic,
format, etc.
- GCP: MQTT sink which implements custom authorization scheme required by Cloud
IoT. Internally it's a simple wrapper over generic MQTT implementation.
- Cloud Pub/Sub: sink directly pushing to Google Cloud Pub/Sub topic skipping
the Cloud IoT bridge.

Data to MQTT servers is published as
[gbcsdpd.api.v1.MeasurementsPublication](../../api/climate.proto) Protobuf
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ go 1.20

require (
cloud.google.com/go/monitoring v1.15.1
cloud.google.com/go/pubsub v1.32.0
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/fhmq/hmq v1.5.0
github.com/godbus/dbus/v5 v5.1.0
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.5.9
github.com/pelletier/go-toml/v2 v2.0.9
golang.org/x/oauth2 v0.10.0
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
)

require (
cloud.google.com/go v0.110.4 // indirect
cloud.google.com/go/compute v1.21.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/bitly/go-simplejson v0.5.1 // indirect
github.com/bytedance/sonic v1.9.2 // indirect
Expand Down Expand Up @@ -71,7 +75,6 @@ require (
golang.org/x/arch v0.4.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.110.4 h1:1JYyxKMN9hd5dR2MYTPWkGUgcoxVVhg0LKNKEo0qvmk=
cloud.google.com/go v0.110.4/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI=
cloud.google.com/go/compute v1.21.0 h1:JNBsyXVoOoNJtTQcnEY5uYpZIbeCTYIeDe0Xh1bySMk=
cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y=
cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU=
cloud.google.com/go/kms v1.12.1 h1:xZmZuwy2cwzsocmKDOPu4BL7umg8QXagQx6fKVmf45U=
cloud.google.com/go/monitoring v1.15.1 h1:65JhLMd+JiYnXr6j5Z63dUYCuOg770p8a/VC+gil/58=
cloud.google.com/go/monitoring v1.15.1/go.mod h1:lADlSAlFdbqQuwwpaImhsJXu1QSdd3ojypXrFSMr2rM=
cloud.google.com/go/pubsub v1.32.0 h1:JOEkgEYBuUTHSyHS4TcqOFuWr+vD6qO/imsFqShUCp4=
cloud.google.com/go/pubsub v1.32.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
Expand Down
25 changes: 23 additions & 2 deletions infra/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ resource "google_pubsub_topic" "measurements_topic" {
depends_on = [google_project_service.service["pubsub"]]
}

resource "google_service_account" "measurements-publisher" {
account_id = "measurements-publisher"
display_name = "Measurements Publisher"
description = "Service account used for pushing measurements via Pub/Sub"

depends_on = [google_project_service.service["iam"], google_project_service.service["pubsub"]]
}

data "google_iam_policy" "measurements_topic-policy" {
binding {
role = "roles/pubsub.publisher"
members = [google_service_account.measurements-publisher.member]
}
}

resource "google_pubsub_topic_iam_policy" "measurements_topic-iam" {
project = google_pubsub_topic.measurements_topic.project
topic = google_pubsub_topic.measurements_topic.name
policy_data = data.google_iam_policy.measurements_topic-policy.policy_data
}

resource "google_cloudiot_registry" "sensors_registry" {
name = "sensors"

Expand Down Expand Up @@ -172,7 +193,7 @@ resource "google_service_account" "metricspusher" {
resource "google_project_iam_member" "metricspusher-metrics-writer" {
project = var.project
role = "roles/monitoring.metricWriter"
member = "serviceAccount:${google_service_account.metricspusher.email}"
member = google_service_account.metricspusher.member
}

resource "google_cloud_run_service" "metricspusher-service" {
Expand Down Expand Up @@ -209,7 +230,7 @@ resource "google_cloud_run_service" "metricspusher-service" {
data "google_iam_policy" "metricspusher-service-policy" {
binding {
role = "roles/run.invoker"
members = ["serviceAccount:${google_service_account.metricspusher-invoker.email}"]
members = [google_service_account.metricspusher-invoker.member]
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
deps = [
"@com_github_golang_jwt_jwt_v5//:go_default_library",
"@com_github_pelletier_go_toml_v2//:go_default_library",
"@com_google_cloud_go_pubsub//:go_default_library",
"@org_golang_x_oauth2//google:go_default_library",
],
)

Expand All @@ -23,5 +25,6 @@ go_test(
"@com_github_golang_jwt_jwt_v5//:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
"@org_golang_x_oauth2//google:go_default_library",
],
)
75 changes: 71 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"context"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
Expand All @@ -25,8 +26,10 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/golang-jwt/jwt/v5"
"github.com/pelletier/go-toml/v2"
"golang.org/x/oauth2/google"
)

// Sink represents the configuration for Sink.
Expand Down Expand Up @@ -74,19 +77,26 @@ type GCPSink struct {
TLSConfig *tls.Config
}

type CloudPubSubSink struct {
Name, Project, Topic, Device string
RateLimit *RateLimit
Creds *google.Credentials
}

// StdoutSink is configuration for sink.StdoutSink.
type StdoutSink struct {
Name string
RateLimit *RateLimit
}

var (
projectIDRE, registryDeviceIDsRE, cloudIoTRegionRE, clientIDRE *regexp.Regexp
projectIDRE, registryDeviceIDsRE, cloudPubSubTopicRE, cloudIoTRegionRE, clientIDRE *regexp.Regexp
)

func init() {
projectIDRE = regexp.MustCompile(`[-a-z0-9]{6,30}`)
registryDeviceIDsRE = regexp.MustCompile(`[a-zA-Z][-a-zA-Z0-9._+~%]{2,254}`)
cloudPubSubTopicRE = regexp.MustCompile(`[a-zA-Z][-a-zA-Z0-9._+~%]{2,254}`)
cloudIoTRegionRE = regexp.MustCompile(`us-central1|europe-west1|asia-east1`)
clientIDRE = regexp.MustCompile(`[0-9a-zA-Z]{0,23}`)
}
Expand Down Expand Up @@ -262,6 +272,56 @@ func parseGCPSink(basePath string, sinkID int, sink *fGCPSink) (*GCPSink, error)
return res, nil
}

func parseCloudPubSubSink(basePath string, sinkID int, sink *fCloudPubSubSink) (*CloudPubSubSink, error) {
ctx := context.Background()
res := &CloudPubSubSink{}
if sink.Name == "" {
res.Name = fmt.Sprintf("unnamed-cloud_pubsub-sink-%d", sinkID)
} else {
res.Name = sink.Name
}

if !registryDeviceIDsRE.MatchString(sink.Device) {
return nil, fmt.Errorf("sink %s: Device must match %s, given: '%s'", registryDeviceIDsRE.String(), res.Name, sink.Device)
}
res.Device = sink.Device

if !cloudPubSubTopicRE.MatchString(sink.Topic) {
return nil, fmt.Errorf("sink %s: Topic must meet requirements in https://cloud.google.com/pubsub/docs/create-topic#resource_names, given: '%s'", res.Name, sink.Topic)
}
res.Topic = sink.Topic

if sink.Creds == nil {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return nil, fmt.Errorf("sink %s: No credentials specified, and failed to find default credentials: %v", sink.Name, err)
}
res.Creds = creds
} else if credsBytes, err := ioutil.ReadFile(joinPathWithAbs(basePath, *sink.Creds)); err != nil {
return nil, fmt.Errorf("sink %s: Failed to read '%s' creds file: %v", sink.Name, *sink.Creds, err)
} else if creds, err := google.CredentialsFromJSON(ctx, credsBytes, "https://www.googleapis.com/auth/pubsub"); err != nil {
return nil, fmt.Errorf("sink %s: Failed to parse '%s' PEM key file: %v", sink.Name, *sink.Creds, err)
} else {
res.Creds = creds
}

if sink.Project == nil {
res.Project = pubsub.DetectProjectID
} else if !projectIDRE.MatchString(*sink.Project) {
return nil, fmt.Errorf("sink %s: Project ID must meet requirements in https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin, given: '%s'", res.Name, *sink.Project)
} else {
res.Project = *sink.Project
}

rateLimit, err := parseRateLimit(sink.RateLimit)
if err != nil {
return nil, fmt.Errorf("sink %s: Failed to parse rate limit: %v", sink.Name, err)
}
res.RateLimit = rateLimit

return res, nil
}

func parseStdoutSink(sinkID int, sink *fStdoutSink) (*StdoutSink, error) {
res := &StdoutSink{}
if sink.Name == "" {
Expand Down Expand Up @@ -305,21 +365,28 @@ func Read(configPath string) (*Config, error) {
for i, sink := range fconfig.Sinks.MQTT {
mqttSink, err := parseMQTTSink(path.Dir(configPath), i, sink)
if err != nil {
return nil, fmt.Errorf("failed to parse MQTT sink conifg: %v", err)
return nil, fmt.Errorf("failed to parse MQTT sink config: %v", err)
}
config.Sinks = append(config.Sinks, mqttSink)
}
for i, sink := range fconfig.Sinks.GCP {
gcpSink, err := parseGCPSink(path.Dir(configPath), i, sink)
if err != nil {
return nil, fmt.Errorf("failed to parse GCP sink conifg: %v", err)
return nil, fmt.Errorf("failed to parse GCP sink config: %v", err)
}
config.Sinks = append(config.Sinks, gcpSink)
}
for i, sink := range fconfig.Sinks.CloudPubSub {
cloudPubSubSink, err := parseCloudPubSubSink(path.Dir(configPath), i, sink)
if err != nil {
return nil, fmt.Errorf("failed to parse Cloud Pub/Sub sink config: %v", err)
}
config.Sinks = append(config.Sinks, cloudPubSubSink)
}
for i, sink := range fconfig.Sinks.Stdout {
stdoutSink, err := parseStdoutSink(i, sink)
if err != nil {
return nil, fmt.Errorf("failed to parse stdout sink conifg: %v", err)
return nil, fmt.Errorf("failed to parse stdout sink config: %v", err)
}
config.Sinks = append(config.Sinks, stdoutSink)
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/config/config_format.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Google LLC
// Copyright 2021-2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,9 +25,10 @@ type fConfig struct {

// Struct holds list of sinks for publications
type fSinks struct {
MQTT []*fMQTTSink `toml:"mqtt"`
GCP []*fGCPSink `toml:"gcp"`
Stdout []*fStdoutSink `toml:"stdout"`
MQTT []*fMQTTSink `toml:"mqtt"`
GCP []*fGCPSink `toml:"gcp"`
CloudPubSub []*fCloudPubSubSink `toml:"cloud_pubsub"`
Stdout []*fStdoutSink `toml:"stdout"`
}

// Configruation for publishing to stdout
Expand Down Expand Up @@ -106,6 +107,26 @@ type fGCPSink struct {
TLS fTLSConfig `toml:"tls"`
}

// Configuration for publishing to Google Cloud Pub/Sub
type fCloudPubSubSink struct {
// Optional name of sink
Name string `toml:"name"`

RateLimit *fRateLimit `toml:"rate_limit"`

// Device used to disambiguate multiple devices publishing to the same topic
Device string `toml:"device"`

// Project Id
Project *string `toml:"project"`

// Cloud Pub/Sub topic name
Topic string `toml:"topic"`

// Path to service account credentials file
Creds *string `toml:"creds"`
}

// Configuration for publishing rate limitting
type fRateLimit struct {
// Specifies rate limit to publish max 1 publication in duration.
Expand Down
27 changes: 26 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"context"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/golang-jwt/jwt/v5"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/oauth2/google"
)

func readKey(t *testing.T, keyPath string) *rsa.PrivateKey {
Expand All @@ -51,6 +53,18 @@ func readCACerts(t *testing.T, caCertsPath string) *x509.CertPool {
return certpool
}

func readGoogleCredentials(t *testing.T, credsPath string) *google.Credentials {
credsBytes, err := ioutil.ReadFile(credsPath)
if err != nil {
t.Fatalf("Failed to load file %s: %v", credsPath, err)
}
creds, err := google.CredentialsFromJSON(context.Background(), credsBytes)
if err != nil {
t.Fatalf("Failed to parse creds file %s: %v", credsPath, err)
}
return creds
}

func caCertsTrans(cp *x509.CertPool) [][]byte {
if cp == nil {
return [][]byte{}
Expand All @@ -62,7 +76,10 @@ func cmpConfig(actual, expected *Config) string {
return cmp.Diff(actual, expected,
cmp.Transformer("CertPool", caCertsTrans),
cmpopts.IgnoreUnexported(tls.Config{}),
cmpopts.IgnoreFields(tls.Config{}, "ClientSessionCache"))
cmpopts.IgnoreUnexported(google.Credentials{}),
cmpopts.IgnoreFields(tls.Config{}, "ClientSessionCache"),
cmpopts.IgnoreFields(google.Credentials{}, "TokenSource"),
)
}

func TestParsingCorrect(t *testing.T) {
Expand Down Expand Up @@ -103,6 +120,14 @@ func TestParsingCorrect(t *testing.T) {
RootCAs: readCACerts(t, "testdata/test1/myCa.pem"),
},
},
&CloudPubSubSink{
Name: "cloud pubsub sink 1",
Device: "device2",
Project: "project2",
Topic: "topic1",
Creds: readGoogleCredentials(t, "testdata/test1/creds.json"),
RateLimit: &RateLimit{Max1In: 120 * time.Second},
},
&StdoutSink{
Name: "stdout sink 1",
RateLimit: &RateLimit{Max1In: 90 * time.Second},
Expand Down
Loading

0 comments on commit 456e4be

Please sign in to comment.