Skip to content

Commit

Permalink
feat: introduce wasm filter using Extism (#761)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Mar 17, 2024
1 parent 3e5e1ca commit 29fed27
Show file tree
Hide file tree
Showing 12 changed files with 1,702 additions and 86 deletions.
1,501 changes: 1,466 additions & 35 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ authors = ["Santiago Carmuega <[email protected]>"]

[features]
deno = ["deno_runtime"]
wasm = ["extism"]
sink-file-rotate = ["file-rotate"]
sink-webhook = ["reqwest"]
sink-rabbitmq = ["lapin"]
Expand Down Expand Up @@ -86,3 +87,6 @@ aws-types = { version = "^1.1", optional = true }
aws-sdk-s3 = { version = "^1.1", optional = true }
aws-sdk-sqs = { version = "^1.1", optional = true }
aws-sdk-lambda = { version = "^1.1", optional = true }

# wasm
extism = { version = "1.2.0", optional = true }
35 changes: 35 additions & 0 deletions examples/wasm_basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# WASM Basic

This example shows how to build a custom Oura plugin using Golang.

The code under `./extract_fee` contains the Golang source code for the plugin. The scope of this very basic plugin is to extract the fee value of each transaction.

## Requirements

- [tinygo](https://tinygo.org/getting-started/install/)

> While the core Go toolchain has support to target WebAssembly, we find tinygo to work well for plug-in code.
## Procedure

1. Build the plugin

Run the following command from inside the `./extract_fee` directory to compile the Golang source into a WASM module using tinigo.

```sh
tinygo build -o plugin.wasm -target wasi main.go
```

The Golang code relies on a plugin system called [extism](https://github.com/extism) that provides several extra features which are not reflected in this example. To read more about how to use Extism in go, refer to the [official docs](https://github.com/extism/go-pdk).

1. Run Oura using the plugin

Run Oura using the `daemon.toml` config in this example that already points to the compiled WASM module generated in the previous step.

```sh
cargo run --features wasm --bin oura -- daemon --config ./daemon.toml
```

> Note that wasm plugins require the Cargo feature flag named `wasm`
You should notice that the events piped in stdout show numbers that represent the fees of each transaction processed.
20 changes: 20 additions & 0 deletions examples/wasm_basic/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Point"
value = [4493860, "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc"]

[[filters]]
type = "SplitBlock"

[[filters]]
type = "ParseCbor"

[[filters]]
type = "WasmPlugin"
path = "./extract_fee/plugin.wasm"

[sink]
type = "Stdout"
5 changes: 5 additions & 0 deletions examples/wasm_basic/extract_fee/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module oura-plugin-basic

go 1.23

require github.com/extism/go-pdk v1.0.2
2 changes: 2 additions & 0 deletions examples/wasm_basic/extract_fee/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/extism/go-pdk v1.0.2 h1:UB7oTW3tw2zoMlsUdBEDAAbhQg9OudzgNeyCwQYZ730=
github.com/extism/go-pdk v1.0.2/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
35 changes: 35 additions & 0 deletions examples/wasm_basic/extract_fee/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"github.com/extism/go-pdk"
)

//export map_u5c_tx
func map_u5c_tx() int32 {
// unmarshal the U5C Tx data provided by the host
var param map[string]interface{}
err := pdk.InputJSON(&param)

if err != nil {
pdk.SetError(err)
return 1
}

//pdk.Log(pdk.LogInfo, fmt.Sprintf("%v", param))

// Here is where you get to do something interesting with the data. In this example, we just extract the fee data from the Tx
fee := param["fee"].(interface{})

// Use this method to return the mapped value back to the Oura pipeline.
err = pdk.OutputJSON(fee)

if err != nil {
pdk.SetError(err)
return 1
}

// return 0 for a successful operation and 1 for failure.
return 0
}

func main() {}
Binary file added examples/wasm_basic/extract_fee/plugin.wasm
Binary file not shown.
28 changes: 21 additions & 7 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub mod noop;
pub mod parse_cbor;
pub mod select;
pub mod split_block;
pub mod wasm;

#[cfg(feature = "wasm")]
pub mod wasm_plugin;

#[cfg(feature = "deno")]
pub mod deno;
Expand All @@ -21,10 +23,12 @@ pub enum Bootstrapper {
Dsl(dsl::Stage),
Json(json::Stage),
LegacyV1(legacy_v1::Stage),
Wasm(wasm::Stage),
ParseCbor(parse_cbor::Stage),
Select(select::Stage),

#[cfg(feature = "wasm")]
WasmPlugin(wasm_plugin::Stage),

#[cfg(feature = "deno")]
Deno(deno::Stage),
}
Expand All @@ -37,10 +41,12 @@ impl Bootstrapper {
Bootstrapper::Dsl(p) => &mut p.input,
Bootstrapper::Json(p) => &mut p.input,
Bootstrapper::LegacyV1(p) => &mut p.input,
Bootstrapper::Wasm(p) => &mut p.input,
Bootstrapper::ParseCbor(p) => &mut p.input,
Bootstrapper::Select(p) => &mut p.input,

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(p) => &mut p.input,

#[cfg(feature = "deno")]
Bootstrapper::Deno(p) => &mut p.input,
}
Expand All @@ -53,10 +59,12 @@ impl Bootstrapper {
Bootstrapper::Dsl(p) => &mut p.output,
Bootstrapper::Json(p) => &mut p.output,
Bootstrapper::LegacyV1(p) => &mut p.output,
Bootstrapper::Wasm(p) => &mut p.output,
Bootstrapper::ParseCbor(p) => &mut p.output,
Bootstrapper::Select(p) => &mut p.output,

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(p) => &mut p.output,

#[cfg(feature = "deno")]
Bootstrapper::Deno(p) => &mut p.output,
}
Expand All @@ -69,10 +77,12 @@ impl Bootstrapper {
Bootstrapper::Dsl(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Json(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Wasm(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Select(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "deno")]
Bootstrapper::Deno(x) => gasket::runtime::spawn_stage(x, policy),
}
Expand All @@ -87,10 +97,12 @@ pub enum Config {
Dsl(dsl::Config),
Json(json::Config),
LegacyV1(legacy_v1::Config),
Wasm(wasm::Config),
ParseCbor(parse_cbor::Config),
Select(select::Config),

#[cfg(feature = "wasm")]
WasmPlugin(wasm_plugin::Config),

#[cfg(feature = "deno")]
Deno(deno::Config),
}
Expand All @@ -103,10 +115,12 @@ impl Config {
Config::Dsl(c) => Ok(Bootstrapper::Dsl(c.bootstrapper(ctx)?)),
Config::Json(c) => Ok(Bootstrapper::Json(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::Wasm(c) => Ok(Bootstrapper::Wasm(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Config::Select(c) => Ok(Bootstrapper::Select(c.bootstrapper(ctx)?)),

#[cfg(feature = "wasm")]
Config::WasmPlugin(c) => Ok(Bootstrapper::WasmPlugin(c.bootstrapper(ctx)?)),

#[cfg(feature = "deno")]
Config::Deno(c) => Ok(Bootstrapper::Deno(c.bootstrapper(ctx)?)),
}
Expand Down
40 changes: 0 additions & 40 deletions src/filters/wasm.rs

This file was deleted.

110 changes: 110 additions & 0 deletions src/filters/wasm_plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//! A filter that maps records by calling custom WASM plugins

use gasket::framework::*;
use serde::Deserialize;

use crate::framework::*;

#[derive(Stage)]
#[stage(name = "filter-wasm", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: FilterInputPort,
pub output: FilterOutputPort,

plugin: extism::Plugin,

#[metric]
ops_count: gasket::metrics::Counter,
}

impl Stage {
fn map_record(&mut self, r: Record) -> Result<Vec<Record>, Error> {
let extism::convert::Json::<serde_json::Value>(output) = match r {
Record::CborBlock(x) => self.plugin.call("map_cbor_block", x).unwrap(),
Record::CborTx(x) => self.plugin.call("map_cbor_tx", x).unwrap(),
Record::ParsedTx(x) => self
.plugin
.call("map_u5c_tx", extism::convert::Json(x))
.unwrap(),
Record::ParsedBlock(x) => self
.plugin
.call("map_u5c_block", extism::convert::Json(x))
.unwrap(),
Record::GenericJson(x) => self
.plugin
.call("map_json", extism::convert::Json(x))
.unwrap(),
Record::OuraV1Event(x) => self
.plugin
.call("map_json", extism::convert::Json(x))
.unwrap(),
};

let output = match output {
serde_json::Value::Null => vec![],
serde_json::Value::Array(x) => x.into_iter().map(Record::GenericJson).collect(),
x @ _ => vec![Record::GenericJson(x)],

Check failure on line 46 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

the `x @ _` pattern can be written as just `x`
};

Ok(output)
}
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(_: &Stage) -> Result<Self, WorkerError> {
Ok(Default::default())
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<ChainEvent>, WorkerError> {
let msg = stage.input.recv().await.or_panic()?;

Ok(WorkSchedule::Unit(msg.payload))
}

async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
let output = unit
.clone()
.try_map_record_to_many(|x| stage.map_record(x))
.or_panic()?;

for unit in output {
stage.output.send(unit.clone().into()).await.or_panic()?;
stage.ops_count.inc(1);
}

Ok(())
}
}

#[derive(Default, Deserialize)]
pub struct Config {
path: String,
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
let wasm = extism::Wasm::file(self.path);
let manifest = extism::Manifest::new([wasm]);
let plugin = extism::Plugin::new(&manifest, [], true).map_err(Error::custom)?;

Check failure on line 101 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

the borrowed expression implements the required traits

Ok(Stage {
input: Default::default(),
output: Default::default(),
ops_count: Default::default(),
plugin,
})
}
}
8 changes: 4 additions & 4 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ impl ChainEvent {
Ok(out)
}

pub fn try_map_record_to_many<E>(
self,
f: fn(Record) -> Result<Vec<Record>, E>,
) -> Result<Vec<Self>, E> {
pub fn try_map_record_to_many<F, E>(self, f: F) -> Result<Vec<Self>, E>
where
F: FnOnce(Record) -> Result<Vec<Record>, E>,
{
let out = match self {
Self::Apply(p, x) => f(x)?
.into_iter()
Expand Down

0 comments on commit 29fed27

Please sign in to comment.