Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/migrate gcp cloudfunction sink #634

Merged
merged 21 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0ca708a
chore(github): added .vscode to ignore
paulobressan Jun 17, 2023
e45bc2f
feat(sinks): added rabbitmq sink
paulobressan Jun 17, 2023
972e8b6
feat(sinks/rabbitmq): changed bootstrap for channel reuse
paulobressan Jun 17, 2023
8848556
feat(sinks/rabbitmq): adjusted dependencies to use features flags
paulobressan Jun 17, 2023
e6a990d
chore(sinks/rabbitmq): removed rabbitmq error handler
paulobressan Jun 19, 2023
f620178
docs(contributing): improved docs how to run oura
paulobressan Jun 19, 2023
d65e781
chore(examples): added rabbitmq daemon
paulobressan Jun 19, 2023
875d96d
feat(docker): added docker compose to run RabbitMQ sink dependencies
paulobressan Jun 19, 2023
4a0ac10
Merge branch 'main' into main
paulobressan Jun 19, 2023
96b3f9f
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 19, 2023
96d4dae
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 20, 2023
62234d8
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 21, 2023
186af44
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 21, 2023
9b4b35f
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 22, 2023
4bafe9b
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 23, 2023
352f2b7
Merge branch 'main' of github.com:txpipe/oura
paulobressan Jun 27, 2023
0ff7a33
chore(sinks/gcp_cloudfunction): started migration
paulobressan Jun 27, 2023
330ee8a
chore(sinks/gcp_cloudfunction): migrated gcp functions
paulobressan Jun 27, 2023
a904bf5
feat(sinks/gcpp_cloudfunction): added refresh token
paulobressan Jun 29, 2023
a9fa047
chore(sinks/gcp_cloudfunction): adjusted token validation
paulobressan Jul 5, 2023
c09b302
Merge branch 'main' into chore/migrate-gcp-cloudfunction-sink
paulobressan Jul 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 4 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ gasket = { version = "^0.4", features = ["derive"] }
# gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }

utxorpc = { version = "1.0.0-alpha.1" }

hex = "0.4.3"
net2 = "0.2.37"
bech32 = "0.9.1"
Expand All @@ -48,47 +47,24 @@ tracing-subscriber = "0.3.17"
file-rotate = "0.7.4"
tokio = { version = "1", features = ["rt"] }
async-trait = "0.1.68"

# feature: fingerprint
elasticsearch = { version = "8.5.0-alpha.1", optional = true }
murmur3 = { version = "0.5.2", optional = true }

# features: aws
aws-config = { version = "0.55.3", optional = true }
aws-types = { version = "0.55.3", optional = true }
aws-sdk-s3 = { version = "0.28.0", optional = true }

# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }

# features: sink-rabbitmq
lapin = { version = "2.2.1", optional = true }

# features: sink-webhook
reqwest = { version = "0.11", features = ["json"], optional = true }

# features: sink-kafka
reqwest = { version = "0.11", features = ["json", "multipart"], optional = true }
kafka = { version = "0.9.0", optional = true }

# features: sink-aws-sqs
aws-sdk-sqs = { version = "0.28.0", optional = true }

# features: sink-aws-lambda
aws-sdk-lambda = { version = "0.28.0", optional = true }

# features: sink-gcp-pubsub
google-cloud-pubsub = { version = "0.16.0", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] }

# features: sink-redis
r2d2_redis = { version = "0.14.0", optional = true }

# features: sink-elasticsearch
elasticsearch = { version = "8.5.0-alpha.1", optional = true }

# features: deno
deno_core = { version = "0.188.0", optional = true }
deno_runtime = { version = "0.114.0", optional = true }
jsonwebtoken = { version = "8.3.0", optional = true }

[features]
default = ["deno"]
Expand All @@ -99,5 +75,6 @@ sink-kafka = ["kafka"]
sink-aws-sqs = ["aws-config", "aws-types", "aws-sdk-sqs"]
sink-aws-lambda = ["aws-config", "aws-types", "aws-sdk-lambda"]
sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default"]
sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"]
sink-redis = ["r2d2_redis"]
sink-elasticsearch = ["elasticsearch"]
16 changes: 9 additions & 7 deletions book/src/sinks/gcp_cloudfunction.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ A sink that sends each event to a cloud function. Each event is json-encoded and
```toml
[sink]
type = "GcpCloudFunction"
name = "oura"
project_id = "XXX"
region = "us-west-2"
url = "https://REGION-PROJECT_ID.cloudfunctions.net/FUNCTION_NAME"
timeout = 30000
error_policy = "Continue"
authorization = "user:pass"
authorization = true

[sink.headers]
extra_header_1 = "abc"
Expand All @@ -28,11 +26,15 @@ max_backoff = 100000
### Section: `sink`

- `type`: the literal value `GcpCloudFunction`
- `name`: the name of the cloud function
- `project_id`: the google cloud project id that the function exists in
- `region`: the region that the function was created in
- `url`: Your function url
- `timeout` (optional): the timeout value for the HTTP response in milliseconds. Default value is `30000`.
- `authorization` (optional): value to add as the 'Authorization' HTTP header
- `headers` (optional): key-value map of extra headers to pass in each HTTP call
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- [retry_policy](../advanced/retry_policy.md)

### GCP Authentication

The GCP authentication process relies on the following conventions:

- If the `GOOGLE_APPLICATION_CREDENTIALS` environmental variable is specified, the value will be used as the file path to retrieve the JSON file with the credentials.
17 changes: 17 additions & 0 deletions examples/gcp_cloudfunction/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Tip"

[[filters]]
type = "SplitBlock"

[[filters]]
type = "ParseCbor"

[sink]
type = "GcpCloudFunction"
url = "https://REGION-PROJECT_ID.cloudfunctions.net/FUNCTION_NAME"
authentication = true
3 changes: 0 additions & 3 deletions src/sinks/_pending/gcp_cloudfunction/mod.rs

This file was deleted.

53 changes: 0 additions & 53 deletions src/sinks/_pending/gcp_cloudfunction/setup.rs

This file was deleted.

2 changes: 2 additions & 0 deletions src/sinks/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(any(feature = "sink-webhook", feature = "sink-gcp-cloudfunction"))]
pub mod web;
34 changes: 34 additions & 0 deletions src/sinks/common/web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::collections::HashMap;

use reqwest::header;

use crate::framework::Error;

pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

pub fn build_headers_map(
authorization: Option<&String>,
extra: Option<&HashMap<String, String>>,
) -> Result<header::HeaderMap, Error> {
let mut headers = header::HeaderMap::new();

headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::try_from("application/json").map_err(Error::config)?,
);

if let Some(auth_value) = &authorization {
let auth_value = header::HeaderValue::try_from(*auth_value).map_err(Error::config)?;
headers.insert(header::AUTHORIZATION, auth_value);
}

if let Some(custom) = &extra {
for (name, value) in custom.iter() {
let name = header::HeaderName::try_from(name).map_err(Error::config)?;
let value = header::HeaderValue::try_from(value).map_err(Error::config)?;
headers.insert(name, value);
}
}

Ok(headers)
}
Loading