Skip to content

Commit

Permalink
Add support for passing custom context via a JobRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
Diggsey committed Jul 17, 2021
1 parent 1cc2262 commit fc4e909
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 154 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/toolchain.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ jobs:
test:
name: Test
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: "1"
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
Expand All @@ -67,10 +69,13 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --nocapture

test_nightly:
name: Test (Nightly)
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: "1"
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
Expand All @@ -86,3 +91,4 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --nocapture
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"rust-analyzer.checkOnSave.allFeatures": false,
"rust-analyzer.cargo.allFeatures": false
}
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlxmq"
version = "0.1.2"
version = "0.2.0"
authors = ["Diggory Blake <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -16,14 +16,15 @@ members = ["sqlxmq_macros", "sqlxmq_stress"]

[dependencies]
sqlx = { version = "0.5.2", features = ["postgres", "chrono", "uuid"] }
tokio = { version = "1.4.0", features = ["full"] }
tokio = { version = "=1.8.0", features = ["full"] }

This comment has been minimized.

Copy link
@agraven

agraven Jul 18, 2021

Contributor

Why the exact dependency? Just out of curiosity

This comment has been minimized.

Copy link
@Diggsey

Diggsey Jul 18, 2021

Author Owner

Unfortunately JoinHandle::abort is broken in tokio 1.8.1, and the breaking change has been backported to previous minor versions as well (eg 1.5.1). I will remove the exact dependency once the issue is resolved.

This comment has been minimized.

Copy link
@Diggsey

Diggsey Jul 18, 2021

Author Owner

This comment has been minimized.

Copy link
@agraven

agraven Jul 18, 2021

Contributor

Thanks for the response 👍

dotenv = "0.15.0"
chrono = "0.4.19"
uuid = { version = "0.8.2", features = ["v4"] }
log = "0.4.14"
serde_json = "1.0.64"
serde = "1.0.124"
sqlxmq_macros = { version = "0.1", path = "sqlxmq_macros" }
sqlxmq_macros = { version = "0.2.0", path = "sqlxmq_macros" }
anymap2 = "0.13.0"

[features]
default = ["runtime-tokio-native-tls"]
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ use sqlxmq::{job, CurrentJob};
// Arguments to the `#[job]` attribute allow setting default job options.
#[job(channel_name = "foo")]
async fn example_job(
// The first argument should always be the current job.
mut current_job: CurrentJob,
// Additional arguments are optional, but can be used to access context
// provided via `JobRegistry::set_context`.
message: &'static str,
) -> sqlx::Result<()> {
// Decode a JSON payload
let who: Option<String> = current_job.json()?;

// Do some work
println!("Hello, {}!", who.as_deref().unwrap_or("world"));
println!("{}, {}!", message, who.as_deref().unwrap_or("world"));

// Mark the job as complete
current_job.complete().await?;
Expand All @@ -160,6 +164,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Here is where you can configure the registry
// registry.set_error_handler(...)

// And add context
registry.set_context("Hello");

let runner = registry
// Create a job runner using the connection pool.
.runner(&pool)
Expand Down
2 changes: 1 addition & 1 deletion sqlxmq_macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlxmq_macros"
version = "0.1.2"
version = "0.2.0"
authors = ["Diggory Blake <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand Down
36 changes: 26 additions & 10 deletions sqlxmq_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {

/// Marks a function as being a background job.
///
/// The function must take a single `CurrentJob` argument, and should
/// be async or return a future.
/// The first argument to the function must have type `CurrentJob`.
/// Additional arguments can be used to access context from the job
/// registry. Context is accessed based on the type of the argument.
/// Context arguments must be `Send + Sync + Clone + 'static`.
///
/// The function should be async or return a future.
///
/// The async result must be a `Result<(), E>` type, where `E` is convertible
/// to a `Box<dyn Error + Send + Sync + 'static>`, which is the case for most
Expand All @@ -103,7 +107,7 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
///
/// # Name
///
/// ```
/// ```ignore
/// #[job("example")]
/// #[job(name="example")]
/// ```
Expand All @@ -115,23 +119,23 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
///
/// # Channel name
///
/// ```
/// ```ignore
/// #[job(channel_name="foo")]
/// ```
///
/// This sets the default channel name on which the job will be spawned.
///
/// # Retries
///
/// ```
/// ```ignore
/// #[job(retries = 3)]
/// ```
///
/// This sets the default number of retries for the job.
///
/// # Retry backoff
///
/// ```
/// ```ignore
/// #[job(backoff_secs=1.5)]
/// #[job(backoff_secs=2)]
/// ```
Expand All @@ -140,7 +144,7 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
///
/// # Ordered
///
/// ```
/// ```ignore
/// #[job(ordered)]
/// #[job(ordered=true)]
/// #[job(ordered=false)]
Expand All @@ -150,7 +154,7 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
///
/// # Prototype
///
/// ```
/// ```ignore
/// fn my_proto<'a, 'b>(
/// builder: &'a mut JobBuilder<'b>
/// ) -> &'a mut JobBuilder<'b> {
Expand All @@ -170,7 +174,7 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
/// prototype will always be applied first so that explicit options can override it.
/// Each option can only be provided once in the attribute.
///
/// ```
/// ```ignore
/// #[job("my_job", proto(my_proto), retries=0, ordered)]
/// ```
///
Expand Down Expand Up @@ -223,6 +227,18 @@ pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
});
}

let extract_ctx: Vec<_> = inner_fn
.sig
.inputs
.iter()
.skip(1)
.map(|_| {
quote! {
registry.context()
}
})
.collect();

let expanded = quote! {
#(#errors)*
#[allow(non_upper_case_globals)]
Expand All @@ -234,7 +250,7 @@ pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
builder #(#chain)*
}),
sqlxmq::hidden::RunFn(|registry, current_job| {
registry.spawn_internal(#fq_name, inner(current_job));
registry.spawn_internal(#fq_name, inner(current_job #(, #extract_ctx)*));
}),
)
};
Expand Down
Loading

0 comments on commit fc4e909

Please sign in to comment.