-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into APIGOV-26621
- Loading branch information
Showing
16 changed files
with
252 additions
and
182 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,97 +1,107 @@ | ||
package beater | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io/ioutil" | ||
"log" | ||
"io" | ||
"net/http" | ||
|
||
traceabilityconfig "github.com/Axway/agents-kong/pkg/config/traceability" | ||
|
||
"github.com/Axway/agents-kong/pkg/processor" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/google/uuid" | ||
|
||
agentErrors "github.com/Axway/agent-sdk/pkg/util/errors" | ||
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/Axway/agent-sdk/pkg/util/log" | ||
|
||
config "github.com/Axway/agents-kong/pkg/config/traceability" | ||
"github.com/Axway/agents-kong/pkg/processor" | ||
) | ||
|
||
type customLogBeater struct { | ||
type httpLogBeater struct { | ||
done chan struct{} | ||
eventProcessor *processor.EventProcessor | ||
client beat.Client | ||
eventChannel chan string | ||
logger log.FieldLogger | ||
} | ||
|
||
// New creates an instance of kong_traceability_agent. | ||
func New(*beat.Beat, *common.Config) (beat.Beater, error) { | ||
bt := &customLogBeater{ | ||
b := &httpLogBeater{ | ||
done: make(chan struct{}), | ||
eventChannel: make(chan string), | ||
logger: log.NewFieldLogger().WithComponent("httpLogBeater").WithPackage("beater"), | ||
} | ||
|
||
bt.eventProcessor = processor.NewEventProcessor() | ||
b.eventProcessor = processor.NewEventProcessor() | ||
|
||
// Validate that all necessary services are up and running. If not, return error | ||
if hc.RunChecks() != hc.OK { | ||
b.logger.Error("not all services are running") | ||
return nil, agentErrors.ErrInitServicesNotReady | ||
} | ||
|
||
return bt, nil | ||
return b, nil | ||
} | ||
|
||
// Run starts kong_traceability_agent. | ||
func (bt *customLogBeater) Run(b *beat.Beat) error { | ||
logp.Info("kong_traceability_agent is running! Hit CTRL-C to stop it.") | ||
func (b *httpLogBeater) Run(beater *beat.Beat) error { | ||
b.logger.Info("kong_traceability_agent is running! Hit CTRL-C to stop it.") | ||
|
||
var err error | ||
bt.client, err = b.Publisher.Connect() | ||
b.client, err = beater.Publisher.Connect() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
http.HandleFunc(traceabilityconfig.GetAgentConfig().HttpLogPluginConfig.Path, | ||
http.HandleFunc(config.GetAgentConfig().HttpLogPluginConfig.Path, | ||
func(w http.ResponseWriter, r *http.Request) { | ||
if r.Method != http.MethodPost { | ||
b.logger.Trace("received a non post request") | ||
w.WriteHeader(http.StatusMethodNotAllowed) | ||
return | ||
} | ||
|
||
body, err := ioutil.ReadAll(r.Body) | ||
ctx := context.WithValue(context.Background(), processor.CtxTransactionID, uuid.NewString()) | ||
logData, err := io.ReadAll(r.Body) | ||
defer r.Body.Close() | ||
|
||
if err != nil { | ||
logp.Error(fmt.Errorf("error while reading request body: %s", err)) | ||
b.logger.WithError(err).Error("reading request body") | ||
} | ||
|
||
w.WriteHeader(200) | ||
bt.processAndDispatchEvent(string(body)) | ||
}) | ||
go b.processAndDispatchEvent(ctx, logData) | ||
}, | ||
) | ||
|
||
/* Start a new HTTP server in a separate Go routine that will be the target | ||
for the HTTP Log plugin. It should write events it gets to eventChannel */ | ||
go func() { | ||
if err := http.ListenAndServe(fmt.Sprintf(":%d", traceabilityconfig.GetAgentConfig().HttpLogPluginConfig.Port), | ||
nil); err != nil { | ||
log.Fatalf("Unable to start the HTTP Server: %s", err) | ||
err := http.ListenAndServe(fmt.Sprintf(":%d", config.GetAgentConfig().HttpLogPluginConfig.Port), nil) | ||
if err != nil { | ||
b.logger.WithError(err).Fatalf("unable to start the HTTP Server") | ||
} | ||
fmt.Printf("Started HTTP server on port %d to receive request logs", traceabilityconfig.GetAgentConfig().HttpLogPluginConfig.Port) | ||
b.logger.WithField("port", config.GetAgentConfig().HttpLogPluginConfig.Port).WithField("path", config.GetAgentConfig().HttpLogPluginConfig.Path).Info("started HTTP server") | ||
}() | ||
|
||
<-bt.done | ||
<-b.done | ||
return nil | ||
} | ||
|
||
// Stop stops kong_traceability_agent. | ||
func (bt *customLogBeater) Stop() { | ||
func (bt *httpLogBeater) Stop() { | ||
bt.client.Close() | ||
close(bt.done) | ||
} | ||
|
||
func (bt *customLogBeater) processAndDispatchEvent(logEvent string) { | ||
eventsToPublish := bt.eventProcessor.ProcessRaw([]byte(logEvent)) | ||
func (bt *httpLogBeater) processAndDispatchEvent(ctx context.Context, logData []byte) { | ||
log := log.UpdateLoggerWithContext(ctx, bt.logger) | ||
log.WithField("data", logData).Trace("handling log data") | ||
eventsToPublish := bt.eventProcessor.ProcessRaw(ctx, logData) | ||
if eventsToPublish != nil { | ||
log.Trace("finished handling data") | ||
bt.client.PublishAll(eventsToPublish) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.