From 6f7fed067250990888cc125b0971df81da85d590 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 5 Dec 2023 12:26:08 -0800 Subject: [PATCH] [EHN] Add topic creation for RendezvousAssignmentPolicy (#1463) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - This PR adds Pulsar topic creation for RendezvousAssignmentPolicy - New functionality - ... ## Test plan *How are these changes tested?* - [ ] Local testing with Pulsar ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- go/coordinator/cmd/grpccoordinator/cmd.go | 1 + go/coordinator/go.mod | 12 +++++ go/coordinator/go.sum | 25 +++++++++++ .../internal/coordinator/assignment_policy.go | 4 +- .../internal/grpccoordinator/server.go | 9 +++- go/coordinator/internal/utils/pulsar_admin.go | 44 +++++++++++++++++++ 6 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 go/coordinator/internal/utils/pulsar_admin.go diff --git a/go/coordinator/cmd/grpccoordinator/cmd.go b/go/coordinator/cmd/grpccoordinator/cmd.go index d3f76b0c6a4..170f60e26e1 100644 --- a/go/coordinator/cmd/grpccoordinator/cmd.go +++ b/go/coordinator/cmd/grpccoordinator/cmd.go @@ -31,6 +31,7 @@ func init() { Cmd.Flags().StringVar(&conf.DBName, "db-name", "", "MetaTable db name") Cmd.Flags().IntVar(&conf.MaxIdleConns, "max-idle-conns", 10, "MetaTable max idle connections") Cmd.Flags().IntVar(&conf.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections") + Cmd.Flags().StringVar(&conf.PulsarAdminURL, "web-service-url", "http://localhost:8080", "Pulsar web service url") Cmd.Flags().StringVar(&conf.PulsarTenant, "pulsar-tenant", "default", "Pulsar tenant") Cmd.Flags().StringVar(&conf.PulsarNamespace, "pulsar-namespace", "default", "Pulsar namespace") Cmd.Flags().StringVar(&conf.KubernetesNamespace, "kubernetes-namespace", "chroma", "Kubernetes namespace") diff --git a/go/coordinator/go.mod b/go/coordinator/go.mod index d33c6d9946b..15534f890ec 100644 --- a/go/coordinator/go.mod +++ b/go/coordinator/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( ariga.io/atlas-provider-gorm v0.1.1 + github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb github.com/google/uuid v1.3.1 github.com/pingcap/log v1.1.0 github.com/rs/zerolog v1.31.0 @@ -21,6 +22,17 @@ require ( pgregory.net/rapid v1.1.0 ) +require ( + github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect + github.com/99designs/keyring v1.2.1 // indirect + github.com/danieljoos/wincred v1.1.2 // indirect + github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/golang-jwt/jwt v3.2.1+incompatible // indirect + github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/mtibben/percent v0.2.1 // indirect +) + require ( ariga.io/atlas-go-sdk v0.1.1-0.20231001054405-7edfcfc14f1c // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go/coordinator/go.sum b/go/coordinator/go.sum index 0d994dd99df..48b0f6a1704 100644 --- a/go/coordinator/go.sum +++ b/go/coordinator/go.sum @@ -2,18 +2,29 @@ ariga.io/atlas-go-sdk v0.1.1-0.20231001054405-7edfcfc14f1c h1:jvi4KB/7DmYYT+Wy2T ariga.io/atlas-go-sdk v0.1.1-0.20231001054405-7edfcfc14f1c/go.mod h1:MLvZ9QwZx1KhI6+8XguxHPUPm0/PTTUr46S5GQAe9WI= ariga.io/atlas-provider-gorm v0.1.1 h1:Y0VsZCQkXJRYIJxenn2BM6sW2u9SkTca5mLvJumqrgE= ariga.io/atlas-provider-gorm v0.1.1/go.mod h1:jb8uYcN+ul8Nf7OVzi5Vd2y+SQXrI4dHYBEUCiCi/6Q= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= +github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo87o= +github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb h1:8c0g4Cu5LHyKuRseT9mJDaCFQZOm2LBUjD3FVesdEJw= +github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= +github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= +github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -27,9 +38,13 @@ github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9 github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -45,6 +60,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -84,8 +101,13 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= +github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= @@ -164,6 +186,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -201,6 +224,7 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= @@ -208,6 +232,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/go/coordinator/internal/coordinator/assignment_policy.go b/go/coordinator/internal/coordinator/assignment_policy.go index 2a8b22604cf..6976d6a9652 100644 --- a/go/coordinator/internal/coordinator/assignment_policy.go +++ b/go/coordinator/internal/coordinator/assignment_policy.go @@ -37,7 +37,7 @@ func createTopicName(tenantID string, topicNS string, log_name string) string { // For now it assumes there are 16 topics and uses the rendezvous hashing algorithm to // assign a collection to a topic. -var topics = [16]string{ +var Topics = [16]string{ "chroma_log_0", "chroma_log_1", "chroma_log_2", @@ -69,7 +69,7 @@ func NewRendezvousAssignmentPolicy(tenantID string, topicNS string) *RendezvousA } func (r *RendezvousAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) { - assignment, error := utils.Assign(collectionID.String(), topics[:], utils.Murmur3Hasher) + assignment, error := utils.Assign(collectionID.String(), Topics[:], utils.Murmur3Hasher) if error != nil { return "", error } diff --git a/go/coordinator/internal/grpccoordinator/server.go b/go/coordinator/internal/grpccoordinator/server.go index fc4eb850b29..38cface04a8 100644 --- a/go/coordinator/internal/grpccoordinator/server.go +++ b/go/coordinator/internal/grpccoordinator/server.go @@ -12,6 +12,7 @@ import ( "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" "github.com/chroma/chroma-coordinator/internal/utils" "github.com/pingcap/log" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/health" "gorm.io/gorm" @@ -34,6 +35,7 @@ type Config struct { MaxOpenConns int // Pulsar config + PulsarAdminURL string PulsarTenant string PulsarNamespace string @@ -83,7 +85,6 @@ func New(config Config) (*Server, error) { } else { return nil, errors.New("invalid system catalog provider, only memory and database are supported") } - } func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gorm.DB) (*Server, error) { @@ -98,6 +99,12 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor assignmentPolicy = coordinator.NewSimpleAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) } else if config.AssignmentPolicy == "rendezvous" { log.Info("Using rendezvous assignment policy") + + err := utils.CreateTopics(config.PulsarAdminURL, config.PulsarTenant, config.PulsarNamespace, coordinator.Topics[:]) + if err != nil { + log.Error("Failed to create topics", zap.Error(err)) + return nil, err + } assignmentPolicy = coordinator.NewRendezvousAssignmentPolicy(config.PulsarTenant, config.PulsarNamespace) } else { return nil, errors.New("invalid assignment policy, only simple and rendezvous are supported") diff --git a/go/coordinator/internal/utils/pulsar_admin.go b/go/coordinator/internal/utils/pulsar_admin.go new file mode 100644 index 00000000000..f6a38f88267 --- /dev/null +++ b/go/coordinator/internal/utils/pulsar_admin.go @@ -0,0 +1,44 @@ +package utils + +import ( + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/apache/pulsar-client-go/pulsaradmin" + pulsar_utils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +// This function creates topics in Pulsar. It takes in a list of topics and creates them in pulsar. +// It assumes that the tenant and namespace already exist in Pulsar. +func CreateTopics(pulsarAdminURL string, tenant string, namespace string, topics []string) error { + cfg := &pulsaradmin.Config{ + WebServiceURL: pulsarAdminURL, + } + admin, err := pulsaradmin.NewClient(cfg) + if err != nil { + log.Error("Failed to create pulsar admin client", zap.Error(err)) + return err + } + + for _, topic := range topics { + topicSchema := "persistent://" + tenant + "/" + namespace + "/" + topic + topicName, err := pulsar_utils.GetTopicName(topicSchema) + if err != nil { + log.Error("Failed to get topic name", zap.Error(err)) + return err + } + metadata, err := admin.Topics().GetMetadata(*topicName) + if err != nil { + log.Info("Failed to get topic metadata, needs to create", zap.Error(err)) + } else { + log.Info("Topic already exists", zap.String("topic", topic), zap.Any("metadata", metadata)) + continue + } + err = admin.Topics().Create(*topicName, 1) + if err != nil { + log.Error("Failed to create topic", zap.Error(err)) + return err + } + } + return nil +}