Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcl timestamp overwrite #2331

Merged
merged 5 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### New features

* axiom stdlib module
* gcl now supports the `timestamp` metadata overwrite

### Breaking Changes

* All google connectors now require `token` to be either set to `{"file": "<path to json>"}` or `{"json": {...}}`
Expand Down
20 changes: 10 additions & 10 deletions src/connectors/impls/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//!
//! ## Configuration
//!
//! ```troy
//! ```tremor
//! # File: config.troy
//! define connector clickhouse from clickhouse
//! with
Expand Down Expand Up @@ -92,7 +92,7 @@
//!
//! The following Tremor values represent a valid `ClickHouse` string:
//!
//! ```
//! ```tremor
//! "Hello, world!"
//! "Grace Hopper"
//! ```
Expand Down Expand Up @@ -120,7 +120,7 @@
//!
//! The following Tremor values can represent any integer type:
//!
//! ```
//! ```tremor
//! 42
//! 101
//! 13
Expand All @@ -142,7 +142,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `DateTime`:
//!
//! ```
//! ```tremor
//! 1634400000
//! 954232020
//! ```
Expand All @@ -164,7 +164,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `DateTime64(0, Etc/UTC)` values.
//!
//! ```
//! ```tremor
//! 1634400000
//! 954232020
//! ```
Expand All @@ -186,7 +186,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `IPv4` values:
//!
//! ```
//! ```tremor
//! "192.168.1.1"
//! [192, 168, 1, 1]
//! ```
Expand All @@ -208,7 +208,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `IPv6` values:
//!
//! ```
//! ```tremor
//! "FE80:0000:0000:0000:0202:B3FF:FE1E:8329"
//! [254, 128, 0, 0, 0, 0, 0, 0, 2, 2, 179, 255, 254, 30, 131, 41]
//! ```
Expand All @@ -231,7 +231,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `UInt8` values:
//!
//! ```
//! ```tremor
//! 101
//! null
//! ```
Expand All @@ -253,7 +253,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `Array(UInt8)` values:
//!
//! ```
//! ```tremor
//! [101, 42, true]
//! [1, 2, false]
//! ```
Expand All @@ -275,7 +275,7 @@
//!
//! The following Tremor values represent valid `ClickHouse` `Uuid` values:
//!
//! ```
//! ```tremor
//! "123e4567-e89b-12d3-a456-426614174000"
//! [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
//! ```
Expand Down
9 changes: 3 additions & 6 deletions src/connectors/impls/gcl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@
//! | `log_name` | Overrides the default configured `log_name` for this event only |
//! | `log_severity` | Overrides the default log severity for this event only |
//! | `resource` | Overrides the default configured `resource`, if provided |
//! | `insert_id` | An optional unique identifier for the log entry. If you provide a value, then Logging considers other log entries in the same project, with the same timestamp, and with the same `insert_id` to be duplicates which are removed in a single query result. However, there are no guarantees of de-duplication in the export of logs |
//! | `insert_id` | An optional unique identifier for the log entry. If you provide a value, then Logging considers other log entries in the same project, with the same timestamp, and with the same `insert_id` to be duplicates which are removed in a single query result. However, there are no guarantees of de-duplication in the export of logs |
//! | `http_request` | Optional information about the HTTP request associated with this log entry, if applicable |
//! | `labels` | An optional map of system-defined or user-defined key-value string pairs related to the entry |
//! | `operation` | Optional information about an operation associated with the log entry, if applicable |
//! | `trace` | Optional. The REST resource name of the trace being written to [Cloud Trace](https://cloud.google.com/trace) in association with this log entry. For example, if your trace data is stored in the Cloud project "my-trace-project" and if the service that is creating the log entry receives a trace header that includes the trace ID "12345", then the service should use "projects/my-tracing-project/traces/12345". The trace field provides the link between logs and traces. By using this field, you can navigate from a log entry to a trace. |
//! | `span_id` | Optional. The ID of the Cloud Trace span associated with the current operation in which the log is being written. For example, if a span has the REST resource name of "projects/some-project/traces/some-trace/spans/some-span-id", then the spanId field is "some-span-id". A Span represents a single operation within a trace. Whereas a trace may involve multiple different microservices running on multiple different machines, a span generally corresponds to a single logical operation being performed in a single instance of a microservice on one specific machine. Spans are the nodes within the tree that is a trace. Applications that are instrumented for tracing will generally assign a new, unique span ID on each incoming request. It is also common to create and record additional spans corresponding to internal processing elements as well as issuing requests to dependencies. The span ID is expected to be a 16-character, hexadecimal encoding of an 8-byte array and should not be zero. It should be unique within the trace and should, ideally, be generated in a manner that is uniformly random. |
//! | `trace_sampled` | The sampling decision of the trace associated with the log entry. True means that the trace resource name in the trace field was sampled for storage in a trace backend. False means that the trace was not sampled for storage when this log entry was written, or the sampling decision was unknown at the time. A non-sampled trace value is still useful as a request correlation identifier. The default is False |
//! | `source_location` | Optional. Source code location information associated with the log entry, if any |
//! | `timestamp` | Optional. Overwrites the timestamp from ingest_ns to this value. the timestamp is provided in nanoseconds. |
//!
//!
//! #### HTTP Request metadata
Expand Down Expand Up @@ -139,7 +140,7 @@
//! C -->|gRPC LogEvent message| D{GCP Cloud Logging}
//! ```
//!
//! ```troy
//! ```tremor
//! define flow main
//! flow
//! use std::time::nanos;
Expand Down Expand Up @@ -169,10 +170,6 @@
//! "project_id": "my-project-id"
//! }
//! },
//! # This is not a test
//! # "partial_success": false,
//! # This is not a dry run
//! # "dry_run": false,
//!
//! # 500ms connection timeout
//! "connect_timeout": nanos::from_millis(500),
Expand Down
32 changes: 32 additions & 0 deletions src/connectors/impls/gcl/writer/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use googapis::google::logging::{
r#type::HttpRequest,
v2::{LogEntryOperation, LogEntrySourceLocation},
};
use prost_types::Timestamp;
use tremor_value::Value;
use value_trait::ValueAccess;

Expand All @@ -28,6 +29,21 @@ pub(crate) fn insert_id(meta: Option<&Value>) -> String {
get_or_default(meta, "insert_id")
}

#[allow(clippy::cast_possible_wrap, clippy::cast_precision_loss)]
pub(crate) fn timestamp(ingest_ns: u64, meta: Option<&Value>) -> Timestamp {
let timestamp = if let Some(timestamp) = meta.get_u64("timestamp") {
timestamp
} else {
ingest_ns
};
let mut timestamp = Timestamp {
seconds: (timestamp / 1_000_000_000) as i64,
nanos: (timestamp % 1_000_000_000) as i32,
};
timestamp.normalize();
timestamp
}

pub(crate) fn http_request(meta: Option<&Value>) -> Option<HttpRequest> {
// Override for a specific per event trace
let meta = meta?;
Expand Down Expand Up @@ -442,4 +458,20 @@ mod test {
sl
);
}

#[test]
fn timestamp_overrides() {
let meta = literal!({
"timestamp": 42
});
let ts = timestamp(0, Some(&meta));

let mut expected = Timestamp {
seconds: 0,
nanos: 42,
};
expected.normalize();

assert_eq!(expected, ts);
}
}
Loading