Skip to content

Commit

Permalink
modify extractHeaders
Browse files Browse the repository at this point in the history
  • Loading branch information
yotamloe committed Oct 14, 2024
1 parent 3153f5d commit 3445340
Showing 1 changed file with 39 additions and 17 deletions.
56 changes: 39 additions & 17 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ const (
cloudRegionAtt = "cloud.region"
)

type handlerConfig struct {
LogzioToken string
EnvId string
url string
RequestId string
}

func initLogger(ctx context.Context, request events.APIGatewayProxyRequest, token string) *zap.Logger {
awsRequestId, account, logzioIdentifier := "", "", ""
if lambdaContext, ok := lambdacontext.FromContext(ctx); ok {
Expand Down Expand Up @@ -110,38 +117,53 @@ func getListenerUrl(log *zap.Logger) string {
}
}

func extractHeaders(request events.APIGatewayProxyRequest) (string, string, string) {
func extractHeaders(request events.APIGatewayProxyRequest) handlerConfig {
config := handlerConfig{}
requestId := request.Headers["X-Amz-Firehose-Request-Id"]
if requestId == "" {
requestId = request.Headers["x-amz-firehose-request-id"]
}
config.RequestId = requestId
logzioToken := request.Headers["X-Amz-Firehose-Access-Key"]
if logzioToken == "" {
logzioToken = request.Headers["x-amz-firehose-access-key"]
}
config.LogzioToken = logzioToken
commonAttributes := request.Headers["X-Amz-Firehose-Common-Attributes"]
if commonAttributes == "" {
commonAttributes = request.Headers["x-amz-firehose-common-attributes"]
}
var commonAttributesMap map[string]interface{}
if err := json.Unmarshal([]byte(commonAttributes), &commonAttributesMap); err != nil {
return requestId, logzioToken, ""
err := json.Unmarshal([]byte(commonAttributes), &commonAttributesMap)
if err != nil {
return config
}
envID := commonAttributesMap["commonAttributes"].(map[string]interface{})["p8s_logzio_name"].(string)
fmt.Println("Common attributes: ", commonAttributesMap)
return requestId, logzioToken, envID
envID, ok := commonAttributesMap["commonAttributes"].(map[string]interface{})["P8S_LOGZIO_NAME"].(string)
if !ok {
envID = ""
}
config.EnvId = envID
url, ok := commonAttributesMap["commonAttributes"].(map[string]interface{})["CUSTOM_LISTENER"].(string)
if !ok {
url = ""
}
config.url = url
return config
}

func createPrometheusRemoteWriteExporter(log *zap.Logger, LogzioToken, envId string) (exporter.Metrics, error) {
if envId == "" {
envId = "logzio-otlp-metrics-stream"
func createPrometheusRemoteWriteExporter(log *zap.Logger, handlerCfg handlerConfig) (exporter.Metrics, error) {
if handlerCfg.EnvId == "" {
handlerCfg.EnvId = "logzio-otlp-metrics-stream"
}
if handlerCfg.url == "" {
handlerCfg.url = getListenerUrl(log)
}
cfg := &prometheusremotewriteexporter.Config{
ExternalLabels: map[string]string{"p8s_logzio_name": envId},
ExternalLabels: map[string]string{"p8s_logzio_name": handlerCfg.EnvId},
ClientConfig: confighttp.ClientConfig{
Endpoint: getListenerUrl(log),
Endpoint: handlerCfg.url,
Timeout: 5 * time.Second,
Headers: map[string]configopaque.String{"Authorization": configopaque.String(fmt.Sprintf("Bearer %s", LogzioToken))},
Headers: map[string]configopaque.String{"Authorization": configopaque.String(fmt.Sprintf("Bearer %s", handlerCfg.LogzioToken))},
},
TargetInfo: &prometheusremotewriteexporter.TargetInfo{
Enabled: false,
Expand Down Expand Up @@ -278,16 +300,16 @@ func processRecord(protoBuffer *proto.Buffer, log *zap.Logger) (pmetric.Metrics,
func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
metricCount, dataPointCount := 0, 0
shippingErrors := new(internal.ErrorCollector)
requestId, LogzioToken, envid := extractHeaders(request)
log := initLogger(ctx, request, LogzioToken)
firehoseResponseClient := internal.NewResponseClient(requestId, log)
handlerCfg := extractHeaders(request)
log := initLogger(ctx, request, handlerCfg.LogzioToken)
firehoseResponseClient := internal.NewResponseClient(handlerCfg.RequestId, log)
defer log.Sync()
if LogzioToken == "" {
if handlerCfg.LogzioToken == "" {
accessKeyErr := errors.New("cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers")
log.Error(accessKeyErr.Error())
return firehoseResponseClient.GenerateValidFirehoseResponse(400, "Error while getting access keys:", accessKeyErr), nil
}
metricsExporter, err := createPrometheusRemoteWriteExporter(log, LogzioToken, envid)
metricsExporter, err := createPrometheusRemoteWriteExporter(log, handlerCfg)
if err != nil {
return firehoseResponseClient.GenerateValidFirehoseResponse(500, "Error while creating metrics exporter:", err), nil
}
Expand Down

0 comments on commit 3445340

Please sign in to comment.