diff --git a/bigquery/client.go b/bigquery/client.go index 49241cc..af7e50d 100644 --- a/bigquery/client.go +++ b/bigquery/client.go @@ -10,6 +10,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/pkg/errors" "google.golang.org/api/googleapi" + "google.golang.org/api/option" "storj.io/eventkit/pb" ) @@ -23,8 +24,8 @@ type BigQueryClient struct { schemeChangeLock sync.Locker } -func NewBigQueryClient(ctx context.Context, project string, datasetName string) (*BigQueryClient, error) { - client, err := bigquery.NewClient(ctx, project) +func NewBigQueryClient(ctx context.Context, project, datasetName string, options ...option.ClientOption) (*BigQueryClient, error) { + client, err := bigquery.NewClient(ctx, project, options...) if err != nil { return nil, errors.WithStack(err) } diff --git a/bigquery/config.go b/bigquery/config.go index 3d6e29f..577d82c 100644 --- a/bigquery/config.go +++ b/bigquery/config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/zeebo/errs/v2" + "google.golang.org/api/option" "storj.io/eventkit" "storj.io/eventkit/destination" @@ -19,6 +20,7 @@ import ( // bigquery:app=...,project=...,dataset=... // bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111 // bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111 +// bigquery:app=...,project=...,dataset=...,credentialsPath=/path/to/my/service-account.json|parallel:runners=10|batch:queueSize=111 func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) { layers := strings.Split(config, "|") var lastLayer func() (eventkit.Destination, error) @@ -33,7 +35,7 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination } switch typeName { case "bigquery", "bq": - var appName, project, dataset string + var appName, project, dataset, credentialsPath string for _, param := range strings.Split(params, ",") { key, value, found := strings.Cut(param, "=") if !found { @@ -46,13 +48,19 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination project = value case "dataset": dataset = value + case "credentialsPath": + credentialsPath = value default: return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key) } } lastLayer = func() (eventkit.Destination, error) { - return NewBigQueryDestination(ctx, appName, project, dataset) + var options []option.ClientOption + if credentialsPath != "" { + options = append(options, option.WithCredentialsFile(credentialsPath)) + } + return NewBigQueryDestination(ctx, appName, project, dataset, options...) } case "parallel": var workers int diff --git a/bigquery/destination.go b/bigquery/destination.go index 43e79c3..0f5c214 100644 --- a/bigquery/destination.go +++ b/bigquery/destination.go @@ -9,6 +9,8 @@ import ( "os" "time" + "google.golang.org/api/option" + "storj.io/eventkit" "storj.io/eventkit/pb" ) @@ -22,8 +24,8 @@ type BigQueryDestination struct { var _ eventkit.Destination = &BigQueryDestination{} -func NewBigQueryDestination(ctx context.Context, appName string, project string, dataset string) (*BigQueryDestination, error) { - c, err := NewBigQueryClient(ctx, project, dataset) +func NewBigQueryDestination(ctx context.Context, appName, project, dataset string, options ...option.ClientOption) (*BigQueryDestination, error) { + c, err := NewBigQueryClient(ctx, project, dataset, options...) if err != nil { return nil, err } diff --git a/eventkitd-bigquery/bigquery/sink.go b/eventkitd-bigquery/bigquery/sink.go index 032fd1a..0ae1628 100644 --- a/eventkitd-bigquery/bigquery/sink.go +++ b/eventkitd-bigquery/bigquery/sink.go @@ -4,6 +4,8 @@ import ( "context" "time" + "google.golang.org/api/option" + "storj.io/eventkit/bigquery" "storj.io/eventkit/eventkitd/listener" "storj.io/eventkit/pb" @@ -14,8 +16,8 @@ type BigQuerySink struct { client *bigquery.BigQueryClient } -func NewBigQuerySink(ctx context.Context, project string, dataset string) (*BigQuerySink, error) { - c, err := bigquery.NewBigQueryClient(ctx, project, dataset) +func NewBigQuerySink(ctx context.Context, project, dataset string, options ...option.ClientOption) (*BigQuerySink, error) { + c, err := bigquery.NewBigQueryClient(ctx, project, dataset, options...) if err != nil { return nil, err } diff --git a/tools/eventkit-save-bq/main.go b/tools/eventkit-save-bq/main.go index f60f889..5f7b983 100644 --- a/tools/eventkit-save-bq/main.go +++ b/tools/eventkit-save-bq/main.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" + "google.golang.org/api/option" "storj.io/eventkit" "storj.io/eventkit/bigquery" @@ -23,8 +24,9 @@ func main() { name := c.Flags().StringP("name", "n", "test", "Name of the event sending out") project := c.Flags().StringP("project", "p", "", "GCP project to use") dataset := c.Flags().StringP("dataset", "d", "eventkitd", "GCP dataset to use") + credentialsPath := c.Flags().StringP("credentialsPath", "c", "", "GCP credentials path, defaults to GOOGLE_APPLICATION_CREDENTIALS if not provided") c.RunE = func(cmd *cobra.Command, args []string) error { - return send(*project, *dataset, *name, args) + return send(*project, *dataset, *credentialsPath, *name, args) } err := c.Execute() if err != nil { @@ -32,8 +34,13 @@ func main() { } } -func send(project string, dataset string, name string, args []string) error { - d, err := bigquery.NewBigQueryDestination(context.Background(), "evenkit-save", project, dataset) +func send(project, dataset, credentialsPath, name string, args []string) error { + var options []option.ClientOption + if credentialsPath != "" { + options = append(options, option.WithCredentialsFile(credentialsPath)) + } + + d, err := bigquery.NewBigQueryDestination(context.Background(), "evenkit-save", project, dataset, options...) if err != nil { return errors.WithStack(err) } diff --git a/tools/eventkit-time/main.go b/tools/eventkit-time/main.go index 5dd0588..7af43ff 100644 --- a/tools/eventkit-time/main.go +++ b/tools/eventkit-time/main.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/viper" "github.com/zeebo/errs/v2" "golang.org/x/sync/errgroup" + "google.golang.org/api/option" "storj.io/eventkit" "storj.io/eventkit/bigquery" @@ -29,6 +30,7 @@ func main() { _ = c.Flags().StringSliceP("tag", "t", []string{}, "Custom tags to add to the events") _ = c.Flags().StringP("instance", "i", "", "Instance name of the eventkitd monitoring (default: hostname)") _ = c.Flags().StringP("scope", "s", "eventkit-time", "Scope to use for events") + _ = c.Flags().StringP("credentialsPath", "c", "", "GCP credentials path, defaults to GOOGLE_APPLICATION_CREDENTIALS if not provided") version := c.Flags().BoolP("version", "v", false, "Scope to use for events") viper.SetConfigName("eventkit-time") viper.SetEnvPrefix("EVENTKIT") @@ -57,7 +59,15 @@ func main() { } } - return execute(viper.GetString("destination"), viper.GetString("name"), args, viper.GetStringSlice("tag"), viper.GetString("scope"), viper.GetString("instance")) + return execute( + viper.GetString("destination"), + viper.GetString("name"), + viper.GetString("credentialsPath"), + args, + viper.GetStringSlice("tag"), + viper.GetString("scope"), + viper.GetString("instance"), + ) } err = c.Execute() if err != nil { @@ -65,7 +75,7 @@ func main() { } } -func execute(dest string, name string, args []string, customTags []string, scope string, instance string) error { +func execute(dest, name, credentialsPath string, args []string, customTags []string, scope, instance string) error { ek := eventkit.DefaultRegistry.Scope(scope) if instance == "" { instance, _ = os.Hostname() @@ -82,7 +92,11 @@ func execute(dest string, name string, args []string, customTags []string, scope var err error dest = strings.TrimPrefix(dest, "bq:") parts := strings.Split(dest, "/") - client, err = bigquery.NewBigQueryDestination(destCtx, "eventkit-time", parts[0], parts[1]) + var options []option.ClientOption + if credentialsPath != "" { + options = append(options, option.WithCredentialsFile(credentialsPath)) + } + client, err = bigquery.NewBigQueryDestination(destCtx, "eventkit-time", parts[0], parts[1], options...) if err != nil { return err }