Skip to content

Commit

Permalink
Add encoding selector option (base64 or binary) (#39)
Browse files Browse the repository at this point in the history
Adds encoding set option to choose the encoding of Avro payload.
By default it's set to base64.

Co-authored-by: @pascalgulikers
Co-authored-by: Karen Metts <[email protected]>
Co-authored-by: Ry Biesemeyer <[email protected]>
  • Loading branch information
3 people authored Apr 28, 2022
1 parent 0f19816 commit 019a5ee
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.4.0
- Add `encoding` option to select the encoding of Avro payload, could be `binary` or `base64` [#39](https://github.com/logstash-plugins/logstash-codec-avro/pull/39)

## 3.3.1
- Pin avro gem to 1.10.x, as 1.11+ requires ruby 2.6+ [#37](https://github.com/logstash-plugins/logstash-codec-avro/pull/37)

Expand Down
12 changes: 12 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ output {
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-encoding>> | <<string,string>>, one of `["binary", "base64"]`|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_uri>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<boolean,boolean>>|No
Expand All @@ -99,6 +100,17 @@ output {

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].

[id="plugins-{type}s-{plugin}-encoding"]
===== `encoding`

* Value can be any of: `binary`, `base64`
* Default value is `base64`

Set encoding for Avro's payload.
Use `base64` (default) to indicate that this codec sends or expects to receive base64-encoded bytes.

Set this option to `binary` to indicate that this codec sends or expects to receive binary Avro data.


[id="plugins-{type}s-{plugin}-schema_uri"]
===== `schema_uri`
Expand Down
20 changes: 18 additions & 2 deletions lib/logstash/codecs/avro.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

BINARY_ENCODING = "binary".freeze
BASE64_ENCODING = "base64".freeze

# Set encoding for Avro's payload.
# Use `base64` (default) encoding to convert the raw binary bytes to a `base64` encoded string.
# Set this option to `binary` to use the plain binary bytes.
config :encoding, :validate => [BINARY_ENCODING, BASE64_ENCODING], :default => BASE64_ENCODING

# schema path to fetch the schema from.
# This can be a 'http' or 'file' scheme URI
# example:
Expand Down Expand Up @@ -92,7 +100,11 @@ def register

public
def decode(data)
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
if encoding == BASE64_ENCODING
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
else
datum = StringIO.new(data)
end
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(@schema)
event = targeted_event_factory.new_event(datum_reader.read(decoder))
Expand All @@ -113,6 +125,10 @@ def encode(event)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
@on_event.call(event, Base64.strict_encode64(buffer.string))
if encoding == BASE64_ENCODING
@on_event.call(event, Base64.strict_encode64(buffer.string))
else
@on_event.call(event, buffer.string)
end
end
end
2 changes: 1 addition & 1 deletion logstash-codec-avro.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-avro'
s.version = '3.3.1'
s.version = '3.4.0'
s.platform = 'java'
s.licenses = ['Apache-2.0']
s.summary = "Reads serialized Avro records as Logstash events"
Expand Down
51 changes: 51 additions & 0 deletions spec/codecs/avro_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@
end
end

context "with binary encoding" do
let (:avro_config) { super().merge('encoding' => 'binary') }

it "should return an LogStash::Event from raw and base64 encoded avro data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(buffer.string) do |event|
expect(event).to be_a_kind_of(LogStash::Event)
expect(event.get("foo")).to eq(test_event.get("foo"))
expect(event.get("bar")).to eq(test_event.get("bar"))
expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled
end
end

it "should raise an error if base64 encoded data is provided" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

expect {subject.decode(Base64.strict_encode64(buffer.string))}.to raise_error
end
end

context "#decode with tag_on_failure" do
let (:avro_config) {{ 'schema_uri' => '
{"type": "record", "name": "Test",
Expand Down Expand Up @@ -111,6 +140,28 @@
insist {got_event}
end

context "with binary encoding" do
let (:avro_config) { super().merge('encoding' => 'binary') }

it "should return avro data from a LogStash::Event not base64 encoded" do
got_event = false
subject.on_event do |event, data|
schema = Avro::Schema.parse(avro_config['schema_uri'])
datum = StringIO.new(data)
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(schema)
record = datum_reader.read(decoder)

expect(event).to be_a_kind_of(LogStash::Event)
expect(event.get("foo")).to eq(test_event.get("foo"))
expect(event.get("bar")).to eq(test_event.get("bar"))
got_event = true
end
subject.encode(test_event)
expect(got_event).to be true
end
end

context "binary data" do

let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data",
Expand Down

0 comments on commit 019a5ee

Please sign in to comment.