Fluent-plugin-openlineage, a plugin for Fluentd
fluent-plugin-openlineage is a Fluentd plugin that verifies if a JSON matches the OpenLineage schema. It is intended to be used together with a Fluentd Application.
fluent-plugin-prometheus | fluentd | ruby |
---|---|---|
1.x.y | >= v1.9.1 | >= 2.4 |
1.[0-7].y | >= v0.14.8 | >= 2.1 |
0.x.y | >= v0.12.0 | >= 1.9 |
Add this line to your application's Gemfile:
gem 'fluent-plugin-openlineage'
And then execute:
$ bundle
Or install it yourself using one of the following:
$ gem install fluent-plugin-openlineage
$ fluent-gem install fluent-plugin-openlineage
fluentd-plugin-openlineage include only one plugin.
openlineage
parse plugin
This is part of the OpenLineage Project repository at: https://github.com/OpenLineage/OpenLineage/tree/main/proxy/fluentd
<<<<<<< HEAD Modern data collectors (Fluentd, Logstash, Vector, etc.) can be extremely useful when designing production-grade architectures for processing Openlineage events.
They can be used for features such as:
- A server-proxy in front of the Openlineage backend (like Marquez) to handle load spikes and buffer incoming events when the backend is down (e.g., due to a maintenance window).
- The ability to copy the event to multiple backends such as HTTP, Kafka or cloud object storage. Data collectors implement that out-of-the-box.
They have great potential except for a single missing feature: the ability to parse and validate OpenLineage events at the point of HTTP input.
This is important as one would like to get a Bad Request
response immediately when sending invalid OpenLineage events to an endpoint.
Fortunately, this missing feature can be implemented as a plugin.
We decided to implement an OpenLineage parser plugin for Fluentd because:
- Fluentd has a small footprint in terms of resource utilization and does not require that JVM be installed,
- Fluentd plugins can be installed from local files (no need to register in a plugin repository).
As a side effect, the Fluentd integration can be also used as a OpenLineage HTTP validation backend for development purposes.
Some interesting Fluentd features are available according to the official documentation:
- Buffering/retrying parameters,
- Useful output plugins:
- Output Kafka plugin,
- Output S3 plugin,
- Output copy plugin,
- Output HTTP plugin with options such as retryable_response_codes to specify backend codes that should cause a retry,
- Buffer configuration,
- Embedding Ruby Expressions in config files to contain environment variables.
The official Fluentd documentation does not mention guarantees about event ordering. However, retrieving Openlineage events and buffering in file/memory should be considered a millisecond-long operation, while any HTTP backend cannot guarantee ordering in such a case. On the other hand, by default the amount of threads to flush the buffer is set to 1 and configurable (flush_thread_count).
Please refer to the Dockerfile
and fluent.conf
to see how to build and install the plugin with
the example usage scenario provided in docker-compose.yml
. To run the example setup, go to the docker
directory and execute the following command:
docker-compose up
After all the containers have started, send some HTTP requests:
curl -X POST \
-d '{"test":"test"}' \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage
In response, you should see the following message:
Openlineage validation failed: path "/": "run" is a required property, path "/": "job" is a required property, path "/": "eventTime" is a required property, path "/": "producer" is a required property, path "/": "schemaURL" is a required property
Next, send some valid requests:
curl -X POST \
-d "$(cat test-start.json)" \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage
curl -X POST \
-d "$(cat test-complete.json)" \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage
After that you should see entities in Marquez (http://localhost:3000/) in the my-namespace
namespace.
To clean up, run
docker-compose down
Although Openlineage event is specified according to Json-Schema, its real-life validation may
vary and backends like Marquez may have less strict approach to validating certain types of facets.
For example, Marquez allows a non-valid DataQualityMetricsInputDatasetFacet
.
To give more flexibility, fluentd parser allows following configuration parameters:
validate_input_dataset_facets => true/false
validate_output_dataset_facets => true/false
validate_dataset_facets => true/false
validate_run_facets => true/false
validate_job_facets => true/false
By default, only validate_run_facets
and validate_job_facets
are set to true
/
To build dependencies:
bundle install
bundle
To run the tests:
bundle exec rake test
The easiest way to install the plugin is to install the main application Fluentd and along with it, these external packages for example in a Dockerfile:
rusty_json_schema
installs a JSON validation library for Rust,fluent-plugin-out-http
allows non-bulk HTTP out requests (sending each OpenLineage event in a separate request).
fluent-gem install rusty_json_schema
fluent-gem install fluent-plugin-out-http
fluent-gem install fluent-plugin-openlineage
Monitoring with Prometheus and some other configs you can try by running a separate fluent.conf file
You may also want to check how Fluentd application itself is doing using Prometheus and for that, you may want to add the plugin: fluent-plugin-prometheus at https://github.com/fluent/fluent-plugin-prometheus and include the following setup in your prometheus.yml file:
global:
scrape_interval: 10s # Set the scrape interval to every 10 seconds. Default is every 1 minute.
#### A scrape configuration containing exactly one endpoint to scrape:
#### Here it's Prometheus itself.
scrape_configs:
- job_name: 'fluentd'
static_configs:
- targets: ['localhost:24231']
You may also want to include the following additional parameters to your fluent.conf file:
#### source
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
#### count the number of incoming records per tag
<filter company.*>
@type prometheus
<metric>
name fluentd_input_status_num_records_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
#### count the number of outgoing records per tag
<match company.*>
@type copy
<store>
@type forward
<server>
name myserver1
host 192.168.1.3
port 24224
weight 60
</server>
</store>
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of outgoing records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
</match>
#### expose metrics in prometheus format
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>
For any additional information, you can check out Fluentd official documentation on https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus#example-prometheus-queries# fluentd-openlineage-parser