Skip to content

Commit

Permalink
Merge pull request #22 from Snyssfx/splunk_output_plugin
Browse files Browse the repository at this point in the history
Plugins: add splunk output plugin
  • Loading branch information
vitkovskii authored Sep 1, 2021
2 parents 00a41f5 + a2dfa78 commit 3af07ab
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 4 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [json_decode](plugin/action/json_decode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [json_decode](plugin/action/json_decode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [stdout](plugin/output/stdout/README.md)
**Output**: [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

## What's next
* [Quick start](/docs/quick-start.md)
Expand Down
2 changes: 2 additions & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [keep_fields](plugin/action/keep_fields/README.md)
- [modify](plugin/action/modify/README.md)
- [parse_es](plugin/action/parse_es/README.md)
- [parse_re2](plugin/action/parse_re2/README.md)
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [throttle](plugin/action/throttle/README.md)
Expand All @@ -41,6 +42,7 @@
- [elasticsearch](plugin/output/elasticsearch/README.md)
- [gelf](plugin/output/gelf/README.md)
- [kafka](plugin/output/kafka/README.md)
- [splunk](plugin/output/splunk/README.md)
- [stdout](plugin/output/stdout/README.md)


Expand Down
5 changes: 3 additions & 2 deletions cmd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ import (
_ "github.com/ozonru/file.d/plugin/input/kafka"
_ "github.com/ozonru/file.d/plugin/output/devnull"
_ "github.com/ozonru/file.d/plugin/output/elasticsearch"
_ "github.com/ozonru/file.d/plugin/output/file"
_ "github.com/ozonru/file.d/plugin/output/gelf"
_ "github.com/ozonru/file.d/plugin/output/kafka"
_ "github.com/ozonru/file.d/plugin/output/stdout"
_ "github.com/ozonru/file.d/plugin/output/file"
_ "github.com/ozonru/file.d/plugin/output/s3"
_ "github.com/ozonru/file.d/plugin/output/splunk"
_ "github.com/ozonru/file.d/plugin/output/stdout"
)

var (
Expand Down
8 changes: 8 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ It parses HTTP input using Elasticsearch `/_bulk` API format. It converts source
> Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).

[More details...](plugin/action/parse_es/README.md)
## parse_re2
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.

[More details...](plugin/action/parse_re2/README.md)
## remove_fields
It removes the list of the event fields and keeps others.

Expand Down Expand Up @@ -273,6 +277,10 @@ Allowed characters in field names are letters, numbers, underscores, dashes, and
It sends the event batches to kafka brokers using `sarama` lib.

[More details...](plugin/output/kafka/README.md)
## splunk
It sends events to splunk.

[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).

Expand Down
4 changes: 4 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ It parses HTTP input using Elasticsearch `/_bulk` API format. It converts source
> Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
[More details...](plugin/action/parse_es/README.md)
## parse_re2
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.

[More details...](plugin/action/parse_re2/README.md)
## remove_fields
It removes the list of the event fields and keeps others.

Expand Down
4 changes: 4 additions & 0 deletions plugin/output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ Allowed characters in field names are letters, numbers, underscores, dashes, and
It sends the event batches to kafka brokers using `sarama` lib.

[More details...](plugin/output/kafka/README.md)
## splunk
It sends events to splunk.

[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).

Expand Down
5 changes: 5 additions & 0 deletions plugin/output/splunk/README.idoc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# splunk HTTP Event Collector output
@introduction

### Config params
@config-params|description
42 changes: 42 additions & 0 deletions plugin/output/splunk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# splunk HTTP Event Collector output
It sends events to splunk.

### Config params
**`endpoint`** *`string`* *`required`*

A full URI address of splunk HEC endpoint. Format: `http://127.0.0.1:8088/services/collector`.

<br>

**`token`** *`string`* *`required`*

Token for an authentication for a HEC endpoint.

<br>

**`workers_count`** *`cfg.Expression`* *`default=gomaxprocs*4`*

How many workers will be instantiated to send batches.

<br>

**`request_timeout`** *`cfg.Duration`* *`default=1s`*

Client timeout when sends requests to HTTP Event Collector.

<br>

**`batch_size`** *`cfg.Expression`* *`default=capacity/4`*

A maximum quantity of events to pack into one batch.

<br>

**`batch_flush_timeout`** *`cfg.Duration`* *`default=200ms`*

After this timeout the batch will be sent even if batch isn't completed.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
188 changes: 188 additions & 0 deletions plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Package splunk is an output plugin that sends events to splunk database.
package splunk

import (
"bytes"
"context"
"crypto/tls"
"io"
"net/http"
"time"

"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/fd"
"github.com/ozonru/file.d/pipeline"
"github.com/pkg/errors"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

/*{ introduction
It sends events to splunk.
}*/

type Plugin struct {
config *Config
logger *zap.SugaredLogger
avgLogSize int
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
requestTimeout time.Duration
}

//! config-params
//^ config-params
type Config struct {
//> @3@4@5@6
//>
//> A full URI address of splunk HEC endpoint. Format: `http://127.0.0.1:8088/services/collector`.
Endpoint string `json:"endpoint" required:"true"` //*

//> @3@4@5@6
//>
//> Token for an authentication for a HEC endpoint.
Token string `json:"token" required:"true"` //*

//> @3@4@5@6
//>
//> How many workers will be instantiated to send batches.
WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` //*
WorkersCount_ int

//> @3@4@5@6
//>
//> Client timeout when sends requests to HTTP Event Collector.
RequestTimeout cfg.Duration `json:"request_timeout" default:"1s" parse:"duration"` //*
RequestTimeout_ time.Duration

//> @3@4@5@6
//>
//> A maximum quantity of events to pack into one batch.
BatchSize cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` //*
BatchSize_ int

//> @3@4@5@6
//>
//> After this timeout the batch will be sent even if batch isn't completed.
BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` //*
BatchFlushTimeout_ time.Duration
}

type data struct {
outBuf []byte
}

func init() {
fd.DefaultPluginRegistry.RegisterOutput(&pipeline.PluginStaticInfo{
Type: "splunk",
Factory: Factory,
})
}

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
p.controller = params.Controller
p.logger = params.Logger
p.avgLogSize = params.PipelineSettings.AvgLogSize
p.config = config.(*Config)

p.batcher = pipeline.NewBatcher(
params.PipelineName,
"splunk",
p.out,
p.maintenance,
p.controller,
p.config.WorkersCount_,
p.config.BatchSize_,
p.config.BatchFlushTimeout_,
0,
)
p.batcher.Start()
}

func (p *Plugin) Stop() {
}

func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgLogSize),
}
}

data := (*workerData).(*data)
// handle to much memory consumption
if cap(data.outBuf) > p.config.BatchSize_*p.avgLogSize {
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgLogSize)
}

outBuf := data.outBuf[:0]
for _, event := range batch.Events {
root := insaneJSON.Spawn()
root.AddField("event").MutateToNode(event.Root.Node)
outBuf = root.Encode(outBuf)
}
data.outBuf = outBuf

for {
err := p.send(outBuf, p.config.RequestTimeout_)
if err != nil {
p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error())
time.Sleep(time.Second)

continue
}

break
}
}

func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {}

func (p *Plugin) send(data []byte, timeout time.Duration) error {
c := http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}

r := bytes.NewReader(data)
req, err := http.NewRequestWithContext(context.Background(), "POST", p.config.Endpoint, r)
if err != nil {
return errors.Wrap(err, "can't create request")
}

req.Header.Set("Authorization", "Splunk "+p.config.Token)
resp, err := c.Do(req)
if err != nil {
return errors.Wrap(err, "can't send request")
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "can't read response")
}

root, err := insaneJSON.DecodeBytes(b)
if err != nil {
return errors.Wrap(err, "can't decode response")
}

code := root.Dig("code").AsInt()
if code > 0 {
return errors.Errorf("error while sending to splunk: %s", string(b))
}

return nil
}

0 comments on commit 3af07ab

Please sign in to comment.