Skip to content

Commit

Permalink
chore(sinks): migrated elasticsearch sink
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jun 26, 2023
1 parent 4a16ada commit d01a0db
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
3 changes: 2 additions & 1 deletion examples/elasticsearch/daemon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type = "ParseCbor"
[sink]
type = "ElasticSearch"
url = "http://localhost:9200"
index = "oura.sink.v2"
index = "oura.sink.v6"
idempotency = true
10 changes: 8 additions & 2 deletions src/sinks/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,15 @@ impl gasket::framework::Worker<Stage> for Worker {
return Ok(());
}

let slot = point.slot_or_default().to_string();
let mut parts = IndexParts::Index(&stage.config.index);
if stage.config.idempotency {
parts = IndexParts::IndexId(&stage.config.index, &slot);
}

let timestamp = stage.genesis.slot_to_wallclock(point.slot_or_default());
let payload = ESRecord::new(record.unwrap(), timestamp);

let parts = IndexParts::Index(&stage.config.index);

self.client
.index(parts)
.body(payload)
Expand Down Expand Up @@ -124,6 +128,8 @@ pub struct Config {
pub url: String,
pub index: String,
pub credentials: Option<CredentialsConfig>,
#[serde(default)]
pub idempotency: bool,
}

impl Config {
Expand Down

0 comments on commit d01a0db

Please sign in to comment.