Skip to content

Commit

Permalink
x-pack/filebeat/input/{websocket=>streaming}: rename package (#40421)
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 authored Aug 7, 2024
1 parent 2060383 commit 30d0cfe
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385]
- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556]
- Debug log entries from the acker (`stateful ack ...` or `stateless ack ...`) removed. {pull}39672[39672]
- Rename x-pack/filebeat websocket input to streaming. {issue}40264[40264] {pull}40421[40421]

==== Bugfixes

Expand Down
6 changes: 3 additions & 3 deletions filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-salesforce>>
* <<{beatname_lc}-input-stdin>>
* <<{beatname_lc}-input-streaming>>
* <<{beatname_lc}-input-syslog>>
* <<{beatname_lc}-input-tcp>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-websocket>>

include::multiline.asciidoc[]

Expand Down Expand Up @@ -148,12 +148,12 @@ include::../../x-pack/filebeat/docs/inputs/input-salesforce.asciidoc[]

include::inputs/input-stdin.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-streaming.asciidoc[]

include::inputs/input-syslog.asciidoc[]

include::inputs/input-tcp.asciidoc[]

include::inputs/input-udp.asciidoc[]

include::inputs/input-unix.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-websocket.asciidoc[]
6 changes: 4 additions & 2 deletions x-pack/agentbeat/agentbeat.spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ inputs:
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- name: websocket
description: "Websocket"
- name: streaming
aliases:
- websocket
description: "Streaming"
platforms: *platforms
outputs: *outputs
command: *filebeat_command
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
[role="xpack"]

:type: websocket
:type: streaming
:mito_version: v1.8.0
:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version}

[id="{beatname_lc}-input-{type}"]
=== Websocket Input
=== Streaming Input
experimental[]

++++
<titleabbrev>Websocket</titleabbrev>
<titleabbrev>Streaming</titleabbrev>
++++

The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint.
The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. Currently only websocket server or API endpoints are supported.

This input supports:

Expand All @@ -21,7 +21,7 @@ This input supports:
** Bearer
** Custom

NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

==== Execution

Expand All @@ -38,7 +38,7 @@ On start the `state` will be something like this:
...
}
----
The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
The `streaming` input websocket handler creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.

If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program's logic.

Expand All @@ -59,7 +59,7 @@ After completion of a program's execution it should return a single object with
----

<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements.
The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object.
The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the streaming data source. The `events` field is the array of events to be published to the output. Each event must be a JSON object.

<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.

Expand All @@ -70,7 +70,7 @@ Example configuration:
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
- type: streaming
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).decode_json().as(inner_body,{
Expand All @@ -83,7 +83,7 @@ filebeat.inputs:
==== Debug state logging

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<websocket-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<streaming-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs.

==== Authentication
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
Expand All @@ -93,23 +93,23 @@ Example configurations with authentication:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
auth.basic_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
auth.custom:
header: "x-api-key"
value: "dXNlcjpwYXNzd29yZA=="
Expand All @@ -119,25 +119,25 @@ filebeat.inputs:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
auth.custom:
header: "Auth"
value: "Bearer dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

[[input-state-websocket]]
[[input-state-streaming]]
==== Input state

The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The `streaming` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts.

==== Configuration options

The `websocket` input supports the following configuration options plus the
The `streaming` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[[program-websocket]]
[[program-streaming]]
[float]
==== `program`

Expand All @@ -153,11 +153,11 @@ program: |
})
----

[[input-url-program-websocket]]
[[input-url-program-streaming]]
[float]
==== `url_program`

If present, this CEL program is executed before the websocket connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.
If present, this CEL program is executed before the streaming connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the streaming connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.

["source","yaml",subs="attributes"]
----
Expand All @@ -177,13 +177,13 @@ program: |
})
----

[[state-websocket]]
[[state-streaming]]
[float]
==== `state`

`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts.

[[cursor-websocket]]
[[cursor-streaming]]
[float]
==== `state.cursor`

Expand All @@ -193,7 +193,7 @@ The cursor is an object available as `state.cursor` where arbitrary values may b
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
- type: streaming
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).as(body, {
Expand All @@ -207,7 +207,7 @@ filebeat.inputs:
})
----

[[regexp-websocket]]
[[regexp-streaming]]
[float]
==== `regexp`

Expand All @@ -216,14 +216,14 @@ A set of named regular expressions that may be used during a CEL program's execu
["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
# Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
regexp:
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
solutions: '(?i)(Search|Observability|Security)'
----

[[websocket-state-redact]]
[[streaming-state-redact]]
[float]
==== `redact`

Expand All @@ -233,7 +233,7 @@ In the case of no-required redaction an empty `redact.fields` configuration shou

["source","yaml",subs="attributes"]
----
- type: websocket
- type: streaming
redact:
fields: ~
----
Expand All @@ -243,7 +243,7 @@ As an example, if a user-constructed Basic Authentication request is used in a C
["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
- type: streaming
url: ws://localhost:443/_stream
state:
user: [email protected]
Expand Down Expand Up @@ -290,11 +290,11 @@ observe the activity of the input.

==== Developer tools

A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.
A stand-alone CEL environment that implements the majority of the streaming input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

NOTE: The `websocket` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.
NOTE: The `streaming` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.

:type!:
:type!:
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce"
"github.com/elastic/beats/v7/x-pack/filebeat/input/websocket"
"github.com/elastic/beats/v7/x-pack/filebeat/input/streaming"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand All @@ -44,7 +44,8 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
awscloudwatch.Plugin(),
lumberjack.Plugin(),
salesforce.Plugin(log, store),
websocket.Plugin(log, store),
streaming.Plugin(log, store),
streaming.PluginWebsocketAlias(log, store),
netflow.Plugin(log),
benchmark.Plugin(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"compress/gzip"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"context"
Expand Down Expand Up @@ -38,7 +38,7 @@ type StreamFollower interface {
}

const (
inputName string = "websocket"
inputName string = "streaming"
root string = "state"
)

Expand All @@ -47,8 +47,19 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
Name: inputName,
Stability: feature.Experimental,
Deprecated: false,
Info: "Streaming Input",
Doc: "Collect data from streaming data sources",
Manager: NewInputManager(log, store),
}
}

func PluginWebsocketAlias(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
return v2.Plugin{
Name: "websocket",
Stability: feature.Experimental,
Deprecated: false,
Info: "Websocket Input",
Doc: "Collect data from websocket api endpoints",
Doc: "Collect data from websocket data sources",
Manager: NewInputManager(log, store),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"github.com/elastic/go-concert/unison"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package websocket
package streaming

import (
"context"
Expand Down Expand Up @@ -472,8 +472,8 @@ func TestURLEval(t *testing.T) {
}

name := input{}.Name()
if name != "websocket" {
t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
if name != "streaming" {
t.Errorf(`unexpected input name: got:%q want:"streaming"`, name)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -524,8 +524,8 @@ func TestInput(t *testing.T) {
}

name := input{}.Name()
if name != "websocket" {
t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
if name != "streaming" {
t.Errorf(`unexpected input name: got:%q want:"streaming"`, name)
}
src := &source{conf}
err = input{}.Test(src, v2.TestContext{})
Expand Down
Loading

0 comments on commit 30d0cfe

Please sign in to comment.