From f52e2d2237e457c9c48b5832fe67a0730d0bf3d1 Mon Sep 17 00:00:00 2001 From: Daniel Garcia Date: Fri, 27 Aug 2021 16:12:40 -0500 Subject: [PATCH] fixes #75: allow labels to be hashed into topic partitions --- config.go | 28 +++++++++++++++++++++++++--- handlers.go | 32 ++++++++++++++++++++++++++++---- main.go | 10 ++++++++++ serializers.go | 14 +++++++++++++- topic_metadata.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 topic_metadata.go diff --git a/config.go b/config.go index 08cb2cd7..c818f1ed 100644 --- a/config.go +++ b/config.go @@ -16,12 +16,14 @@ package main import ( "fmt" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "gopkg.in/yaml.v2" "os" "strings" "text/template" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "gopkg.in/yaml.v2" "github.com/sirupsen/logrus" ) @@ -29,6 +31,9 @@ import ( var ( kafkaBrokerList = "kafka:9092" kafkaTopic = "metrics" + kafkaPartitionLabels []string + kafkaMetadataTimeout = time.Second * 10 + kafkaMetadataInterval = time.Minute * 5 topicTemplate *template.Template match = make(map[string]*dto.MetricFamily, 0) basicauth = false @@ -63,6 +68,23 @@ func init() { kafkaTopic = value } + if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" { + kafkaPartitionLabels = strings.Split(value, ",") + } + + if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" { + d, err := time.ParseDuration(value) + if err != nil { + logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default") + } else { + if d < 0 { + logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout") + } else { + kafkaMetadataTimeout = d + } + } + } + if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" { basicauth = true basicauthUsername = value diff --git a/handlers.go b/handlers.go index 28d22225..52bae68f 100644 --- a/handlers.go +++ b/handlers.go @@ -16,8 +16,10 @@ package main import ( "fmt" + "hash/fnv" "io/ioutil" "net/http" + "strings" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -62,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin return } - for topic, metrics := range metricsPerTopic { - t := topic + for topicAndHashKey, metrics := range metricsPerTopic { + + topic, partitionID, err := getPartitionAndTopic(topicAndHashKey) + if err != nil { + continue + } + part := kafka.TopicPartition{ - Partition: kafka.PartitionAny, - Topic: &t, + Partition: partitionID, + Topic: &topic, } for _, metric := range metrics { objectsWritten.Add(float64(1)) @@ -87,3 +94,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin } } + +func getPartitionAndTopic(topic string) (string, int32, error) { + parts := strings.Split(topic, "|") + + if len(parts) == 1 { + return parts[0], kafka.PartitionAny, nil + } + h := fnv.New32a() + h.Write([]byte(parts[1])) + + v, ok := topicPartitionCount.Load(parts[0]) + if !ok { + logrus.WithField("topic", parts[0]).Error("did not find metadata requested topic") + return topic, kafka.PartitionAny, fmt.Errorf("could not") + } + return parts[0], int32(h.Sum32() % uint32(v.(int))), nil +} diff --git a/main.go b/main.go index a9173ed6..4724f6ac 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "time" "github.com/confluentinc/confluent-kafka-go/kafka" @@ -27,6 +28,9 @@ import ( func main() { logrus.Info("creating kafka producer") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + kafkaConfig := kafka.ConfigMap{ "bootstrap.servers": kafkaBrokerList, "compression.codec": kafkaCompression, @@ -68,6 +72,12 @@ func main() { logrus.WithError(err).Fatal("couldn't create kafka producer") } + if kafkaPartitionLabels != nil { + if err := syncTopicMetadata(ctx, producer); err != nil { + logrus.WithError(err).Fatal("couldn't fetch topic metadata") + } + } + r := gin.New() r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery()) diff --git a/serializers.go b/serializers.go index e75e86dd..4a45ad6a 100644 --- a/serializers.go +++ b/serializers.go @@ -116,10 +116,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) { } func topic(labels map[string]string) string { - var buf bytes.Buffer + var buf, buf2 bytes.Buffer if err := topicTemplate.Execute(&buf, labels); err != nil { return "" } + for _, s := range kafkaPartitionLabels { + v, ok := labels[s] + if ok { + if _, err := buf2.WriteString(v); err != nil { + return "" + } + } + } + if buf2.Len() > 0 { + buf.WriteString("|") + buf.WriteString(buf2.String()) + } return buf.String() } diff --git a/topic_metadata.go b/topic_metadata.go new file mode 100644 index 00000000..adc7a922 --- /dev/null +++ b/topic_metadata.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "math" + "sync" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/sirupsen/logrus" +) + +var topicPartitionCount sync.Map + +type metaDataFetcher interface { + GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) +} + +func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error { + + if err := processMetadata(producer); err != nil { + return err + } + go func() { + select { + case <-ctx.Done(): + return + + case <-time.After(kafkaMetadataInterval): + if err := processMetadata(producer); err != nil { + logrus.WithError(err).Error("could not fetch topic metadata") + } + } + }() + return nil +} + +func processMetadata(producer metaDataFetcher) error { + metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds()))) + if err != nil { + return err + } + for name, topic := range metadata.Topics { + topicPartitionCount.Store(name, len(topic.Partitions)) + } + return nil +}