Skip to content

Commit

Permalink
Merge pull request #1055 from neon-bindings/kv/async-runtime
Browse files Browse the repository at this point in the history
feat(neon): Add tokio async runtime support
  • Loading branch information
kjvalencik authored Sep 20, 2024
2 parents 6b541a2 + df091a2 commit 68c48ef
Show file tree
Hide file tree
Showing 17 changed files with 515 additions and 80 deletions.
8 changes: 4 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[alias]
# Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features`
# The following aliases simplify linting the entire workspace
neon-check = " check --all --all-targets --features napi-experimental,futures,external-buffers,serde"
neon-clippy = "clippy --all --all-targets --features napi-experimental,futures,external-buffers,serde -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=doc-dependencies,doc-comment,napi-experimental,futures,external-buffers,serde"
neon-doc = " rustdoc -p neon --features=doc-dependencies,napi-experimental,futures,external-buffers,sys,serde -- --cfg docsrs"
neon-check = " check --all --all-targets --features napi-experimental,external-buffers,serde,tokio"
neon-clippy = "clippy --all --all-targets --features napi-experimental,external-buffers,serde,tokio -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=doc-dependencies,doc-comment,napi-experimental,external-buffers,serde,tokio"
neon-doc = " rustdoc -p neon --features=doc-dependencies,napi-experimental,external-buffers,sys,serde,tokio -- --cfg docsrs"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 33 additions & 4 deletions crates/neon-macros/src/export/function/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub(crate) struct Meta {

#[derive(Default)]
pub(super) enum Kind {
Async,
AsyncFn,
#[default]
Normal,
Task,
Expand All @@ -28,7 +30,8 @@ impl Meta {

fn force_context(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
match self.kind {
Kind::Normal => {}
Kind::Normal | Kind::AsyncFn => {}
Kind::Async => return Err(meta.error(super::ASYNC_CX_ERROR)),
Kind::Task => return Err(meta.error(super::TASK_CX_ERROR)),
}

Expand All @@ -37,6 +40,16 @@ impl Meta {
Ok(())
}

fn make_async(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
if matches!(self.kind, Kind::AsyncFn) {
return Err(meta.error(super::ASYNC_FN_ERROR));
}

self.kind = Kind::Async;

Ok(())
}

fn make_task(&mut self, meta: syn::meta::ParseNestedMeta) -> syn::Result<()> {
if self.context {
return Err(meta.error(super::TASK_CX_ERROR));
Expand All @@ -48,13 +61,25 @@ impl Meta {
}
}

pub(crate) struct Parser;
pub(crate) struct Parser(syn::ItemFn);

impl Parser {
pub(crate) fn new(item: syn::ItemFn) -> Self {
Self(item)
}
}

impl syn::parse::Parser for Parser {
type Output = Meta;
type Output = (syn::ItemFn, Meta);

fn parse2(self, tokens: proc_macro2::TokenStream) -> syn::Result<Self::Output> {
let Self(item) = self;
let mut attr = Meta::default();

if item.sig.asyncness.is_some() {
attr.kind = Kind::AsyncFn;
}

let parser = syn::meta::parser(|meta| {
if meta.path.is_ident("name") {
return attr.set_name(meta);
Expand All @@ -68,6 +93,10 @@ impl syn::parse::Parser for Parser {
return attr.force_context(meta);
}

if meta.path.is_ident("async") {
return attr.make_async(meta);
}

if meta.path.is_ident("task") {
return attr.make_task(meta);
}
Expand All @@ -77,6 +106,6 @@ impl syn::parse::Parser for Parser {

parser.parse2(tokens)?;

Ok(attr)
Ok((item, attr))
}
}
28 changes: 21 additions & 7 deletions crates/neon-macros/src/export/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::export::function::meta::Kind;

pub(crate) mod meta;

static ASYNC_CX_ERROR: &str = "`FunctionContext` is not allowed in async functions";
static ASYNC_FN_ERROR: &str = "`async` attribute should not be used with an `async fn`";
static TASK_CX_ERROR: &str = "`FunctionContext` is not allowed with `task` attribute";

pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenStream {
Expand Down Expand Up @@ -40,19 +42,19 @@ pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenS
.unwrap_or_else(|| quote::quote!(#name))
});

// Import the value or JSON trait for conversion
let result_trait_name = if meta.json {
quote::format_ident!("NeonExportReturnJson")
// Tag whether we should JSON wrap results
let return_tag = if meta.json {
quote::format_ident!("NeonJsonTag")
} else {
quote::format_ident!("NeonExportReturnValue")
quote::format_ident!("NeonValueTag")
};

// Convert the result
// N.B.: Braces are intentionally included to avoid leaking trait to function body
let result_extract = quote::quote!({
use neon::macro_internal::#result_trait_name;
use neon::macro_internal::{ToNeonMarker, #return_tag as NeonReturnTag};

res.try_neon_export_return(&mut cx)
(&res).to_neon_marker::<NeonReturnTag>().neon_into_js(&mut cx, res)
});

// Default export name as identity unless a name is provided
Expand All @@ -63,6 +65,17 @@ pub(super) fn export(meta: meta::Meta, input: syn::ItemFn) -> proc_macro::TokenS

// Generate the call to the original function
let call_body = match meta.kind {
Kind::Async | Kind::AsyncFn => quote::quote!(
let (#(#tuple_fields,)*) = cx.args()?;
let fut = #name(#context_arg #(#args),*);
let fut = {
use neon::macro_internal::{ToNeonMarker, NeonValueTag};

(&fut).to_neon_marker::<NeonValueTag>().into_neon_result(&mut cx, fut)?
};

neon::macro_internal::spawn(&mut cx, fut, |mut cx, res| #result_extract)
),
Kind::Normal => quote::quote!(
let (#(#tuple_fields,)*) = cx.args()?;
let res = #name(#context_arg #(#args),*);
Expand Down Expand Up @@ -160,7 +173,8 @@ fn has_context_arg(meta: &meta::Meta, sig: &syn::Signature) -> syn::Result<bool>

// Context is only allowed for normal functions
match meta.kind {
Kind::Normal => {}
Kind::Normal | Kind::Async => {}
Kind::AsyncFn => return Err(syn::Error::new(first.span(), ASYNC_CX_ERROR)),
Kind::Task => return Err(syn::Error::new(first.span(), TASK_CX_ERROR)),
}

Expand Down
3 changes: 2 additions & 1 deletion crates/neon-macros/src/export/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub(crate) fn export(
match item {
// Export a function
syn::Item::Fn(item) => {
let meta = syn::parse_macro_input!(attr with function::meta::Parser);
let parser = function::meta::Parser::new(item);
let (item, meta) = syn::parse_macro_input!(attr with parser);

function::export(meta, item)
}
Expand Down
8 changes: 7 additions & 1 deletion crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude = ["neon.jpg", "doc/**/*"]
edition = "2021"

[dev-dependencies]
itertools = "0.10.5"
semver = "1.0.20"
psd = "0.3.4" # used for a doc example
anyhow = "1.0.75" # used for a doc example
Expand Down Expand Up @@ -56,12 +57,17 @@ external-buffers = []

# Experimental Rust Futures API
# https://github.com/neon-bindings/rfcs/pull/46
futures = ["tokio"]
futures = ["dep:tokio"]

# Enable low-level system APIs. The `sys` API allows augmenting the Neon API
# from external crates.
sys = []

# Enable async runtime
tokio = ["tokio-rt-multi-thread"] # Shorter alias
tokio-rt = ["futures", "tokio/rt"]
tokio-rt-multi-thread = ["tokio-rt", "tokio/rt-multi-thread"]

# Default N-API version. Prefer to select a minimum required version.
# DEPRECATED: This is an alias that should be removed
napi-runtime = ["napi-8"]
Expand Down
2 changes: 2 additions & 0 deletions crates/neon/src/context/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub trait ContextInternal<'cx>: Sized {
}

fn default_main(mut cx: ModuleContext) -> NeonResult<()> {
#[cfg(feature = "tokio-rt-multi-thread")]
crate::executor::tokio::init(&mut cx)?;
crate::registered().export(&mut cx)
}

Expand Down
59 changes: 59 additions & 0 deletions crates/neon/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::{future::Future, pin::Pin};

use crate::{context::Cx, thread::LocalKey};

#[cfg(feature = "tokio-rt")]
pub(crate) mod tokio;

type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

pub(crate) static RUNTIME: LocalKey<Box<dyn Runtime>> = LocalKey::new();

pub trait Runtime: Send + Sync + 'static {
fn spawn(&self, fut: BoxFuture);
}

/// Register a [`Future`] executor runtime globally to the addon.
///
/// Returns `Ok(())` if a global executor has not been set and `Err(runtime)` if it has.
///
/// If the `tokio` feature flag is enabled and the addon does not provide a
/// [`#[neon::main]`](crate::main) function, a multithreaded tokio runtime will be
/// automatically registered.
///
/// **Note**: Each instance of the addon will have its own runtime. It is recommended
/// to initialize the async runtime once in a process global and share it across instances.
///
/// ```
/// # #[cfg(feature = "tokio-rt-multi-thread")]
/// # fn example() {
/// # use neon::prelude::*;
/// use once_cell::sync::OnceCell;
/// use tokio::runtime::Runtime;
///
/// static RUNTIME: OnceCell<Runtime> = OnceCell::new();
///
/// #[neon::main]
/// fn main(mut cx: ModuleContext) -> NeonResult<()> {
/// let runtime = RUNTIME
/// .get_or_try_init(Runtime::new)
/// .or_else(|err| cx.throw_error(err.to_string()))?;
///
/// let _ = neon::set_global_executor(&mut cx, runtime);
///
/// Ok(())
/// }
/// # }
/// ```
pub fn set_global_executor<R>(cx: &mut Cx, runtime: R) -> Result<(), R>
where
R: Runtime,
{
if RUNTIME.get(cx).is_some() {
return Err(runtime);
}

RUNTIME.get_or_init(cx, || Box::new(runtime));

Ok(())
}
66 changes: 66 additions & 0 deletions crates/neon/src/executor/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::sync::Arc;

use super::{BoxFuture, Runtime};

impl Runtime for tokio::runtime::Runtime {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for Arc<tokio::runtime::Runtime> {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for &'static tokio::runtime::Runtime {
fn spawn(&self, fut: BoxFuture) {
spawn(self.handle(), fut);
}
}

impl Runtime for tokio::runtime::Handle {
fn spawn(&self, fut: BoxFuture) {
spawn(self, fut);
}
}

impl Runtime for &'static tokio::runtime::Handle {
fn spawn(&self, fut: BoxFuture) {
spawn(self, fut);
}
}

fn spawn(handle: &tokio::runtime::Handle, fut: BoxFuture) {
#[allow(clippy::let_underscore_future)]
let _ = handle.spawn(fut);
}

#[cfg(feature = "tokio-rt-multi-thread")]
pub(crate) fn init(cx: &mut crate::context::ModuleContext) -> crate::result::NeonResult<()> {
use once_cell::sync::OnceCell;
use tokio::runtime::{Builder, Runtime};

use crate::context::Context;

static RUNTIME: OnceCell<Runtime> = OnceCell::new();

super::RUNTIME.get_or_try_init(cx, |cx| {
let runtime = RUNTIME
.get_or_try_init(|| {
#[cfg(feature = "tokio-rt-multi-thread")]
let mut builder = Builder::new_multi_thread();

#[cfg(not(feature = "tokio-rt-multi-thread"))]
let mut builder = Builder::new_current_thread();

builder.enable_all().build()
})
.or_else(|err| cx.throw_error(err.to_string()))?;

Ok(Box::new(runtime))
})?;

Ok(())
}
Loading

0 comments on commit 68c48ef

Please sign in to comment.