Skip to content

Commit

Permalink
feat(cyclotron): expose and adopt bulk job creation (#25150)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Sep 24, 2024
1 parent cf07fb3 commit 62ac774
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 93 deletions.
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@
},
"cyclotron": {
"//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true,
"version": "0.1.4"
"version": "0.1.5"
}
}
21 changes: 10 additions & 11 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,16 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
)

// For the cyclotron ones we simply create the jobs
await Promise.all(
cyclotronInvocations.map((item) =>
this.cyclotronManager?.createJob({
teamId: item.globals.project.id,
functionId: item.hogFunction.id,
queueName: 'hog',
priority: item.priority,
vmState: serializeHogFunctionInvocation(item),
})
)
)
const cyclotronJobs = cyclotronInvocations.map((item) => {
return {
teamId: item.globals.project.id,
functionId: item.hogFunction.id,
queueName: 'hog',
priority: item.priority,
vmState: serializeHogFunctionInvocation(item),
}
})
await this.cyclotronManager?.bulkCreateJobs(cyclotronJobs)

if (kafkaInvocations.length) {
// As we don't want to over-produce to kafka we invoke the hog functions and then queue the results
Expand Down
1 change: 0 additions & 1 deletion rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ mod ops;
// Types
mod types;
pub use types::AggregatedDelete;
pub use types::BulkInsertResult;
pub use types::Bytes;
pub use types::Job;
pub use types::JobInit;
Expand Down
48 changes: 13 additions & 35 deletions rust/cyclotron-core/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
manager::{bulk_create_jobs, create_job},
meta::count_total_waiting_jobs,
},
BulkInsertResult, JobInit, ManagerConfig, QueueError,
JobInit, ManagerConfig, QueueError,
};

pub struct Shard {
Expand Down Expand Up @@ -82,48 +82,26 @@ impl QueueManager {
shard.create_job_blocking(init, timeout).await
}

pub async fn bulk_create_jobs(&self, inits: Vec<JobInit>) -> BulkInsertResult {
pub async fn bulk_create_jobs(&self, inits: Vec<JobInit>) -> Result<(), QueueError> {
let next = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let shards = self.shards.read().await;
let chunk_size = inits.len() / shards.len();
let mut result = BulkInsertResult::new();
// TODO - at some point, we should dynamically re-acquire the lock each time, to allow
// for re-routing jobs away from a bad shard during a bulk insert, but right now, we
// don't even re-try inserts. Later work.
for chunk in inits.chunks(chunk_size) {
let next_shard = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let shard = &shards[next_shard % shards.len()];
let shard_result = shard.bulk_create_jobs(chunk).await;
if let Err(err) = shard_result {
result.add_failure(err, chunk.to_vec());
}
}

result
shards[next % shards.len()].bulk_create_jobs(&inits).await
}

pub async fn bulk_create_jobs_blocking(
&self,
inits: Vec<JobInit>,
timeout: Option<Duration>,
) -> BulkInsertResult {
) -> Result<(), QueueError> {
let next = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let shards = self.shards.read().await;
let chunk_size = inits.len() / shards.len();
let mut result = BulkInsertResult::new();
for chunk in inits.chunks(chunk_size) {
let next_shard = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let shard = &shards[next_shard % shards.len()];
// TODO - we sequentially try each shard, but we could try to parallelize this.
let shard_result = shard.bulk_create_jobs_blocking(chunk, timeout).await;
if let Err(err) = shard_result {
result.add_failure(err, chunk.to_vec());
}
}

result
shards[next % shards.len()]
.bulk_create_jobs_blocking(&inits, timeout)
.await
}
}

Expand Down
28 changes: 0 additions & 28 deletions rust/cyclotron-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use std::str::FromStr;
use uuid::Uuid;

use crate::QueueError;

pub type Bytes = Vec<u8>;

#[derive(Debug, Deserialize, Serialize, sqlx::Type)]
Expand Down Expand Up @@ -120,32 +118,6 @@ impl JobUpdate {
}
}

// Bulk inserts across multiple shards can partially succeed, so we need to track failures
// and hand back failed job inits to the caller.
pub struct BulkInsertResult {
pub failures: Vec<(QueueError, Vec<JobInit>)>,
}

impl BulkInsertResult {
pub fn new() -> Self {
Self { failures: vec![] }
}

pub fn add_failure(&mut self, err: QueueError, jobs: Vec<JobInit>) {
self.failures.push((err, jobs));
}

pub fn all_succeeded(&self) -> bool {
self.failures.is_empty()
}
}

impl Default for BulkInsertResult {
fn default() -> Self {
Self::new()
}
}

// Result of janitor's `delete_completed_and_failed_jobs`
#[derive(sqlx::FromRow, Debug)]
pub struct AggregatedDelete {
Expand Down
6 changes: 4 additions & 2 deletions rust/cyclotron-core/tests/base_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ pub async fn test_bulk_insert(db: PgPool) {
})
.collect::<Vec<_>>();

let result = manager.bulk_create_jobs(jobs).await;
assert!(result.all_succeeded());
manager
.bulk_create_jobs(jobs)
.await
.expect("failed to bulk insert jobs");

let dequeue_jobs = worker
.dequeue_jobs(&job_template.queue_name, 1000)
Expand Down
2 changes: 1 addition & 1 deletion rust/cyclotron-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@posthog/cyclotron",
"version": "0.1.4",
"version": "0.1.5",
"description": "Node bindings for cyclotron",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
106 changes: 93 additions & 13 deletions rust/cyclotron-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use neon::{
result::{JsResult, NeonResult},
types::{
buffer::TypedArray, JsArray, JsArrayBuffer, JsNull, JsNumber, JsObject, JsPromise,
JsString, JsUint8Array, JsUndefined, JsValue,
JsString, JsUint32Array, JsUint8Array, JsUndefined, JsValue,
},
};
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -174,25 +174,15 @@ fn create_job(mut cx: FunctionContext) -> JsResult<JsPromise> {
None
} else {
Some(
blob.downcast_or_throw::<JsArrayBuffer, _>(&mut cx)?
blob.downcast_or_throw::<JsUint8Array, _>(&mut cx)?
.as_slice(&cx)
.to_vec(),
)
};

let js_job: JsJob = from_json_string(&mut cx, arg1)?;

let job = JobInit {
team_id: js_job.team_id,
queue_name: js_job.queue_name,
priority: js_job.priority,
scheduled: js_job.scheduled,
function_id: js_job.function_id,
vm_state: js_job.vm_state.map(|s| s.into_bytes()),
parameters: js_job.parameters.map(|s| s.into_bytes()),
metadata: js_job.metadata.map(|s| s.into_bytes()),
blob,
};
let job = js_job.to_job_init(blob);

let (deferred, promise) = cx.promise();
let channel = cx.channel();
Expand Down Expand Up @@ -220,6 +210,79 @@ fn create_job(mut cx: FunctionContext) -> JsResult<JsPromise> {
Ok(promise)
}

fn bulk_create_jobs(mut cx: FunctionContext) -> JsResult<JsPromise> {
let jobs = cx.argument::<JsString>(0)?;
let jobs: Vec<JsJob> = from_json_string(&mut cx, jobs)?;

let blobs = cx.argument::<JsValue>(1)?;
let blob_lengths = cx.argument::<JsValue>(2)?;

let blobs = blobs
.downcast_or_throw::<JsUint8Array, _>(&mut cx)?
.as_slice(&cx)
.to_vec();

let blob_lengths: Vec<usize> = blob_lengths
.downcast_or_throw::<JsUint32Array, _>(&mut cx)?
.as_slice(&cx)
.iter()
.map(|&v| v as usize)
.collect();

if jobs.len() != blob_lengths.len() {
return cx.throw_error("jobs and blob_lengths must have the same length");
}

if blobs.len() != blob_lengths.iter().sum::<usize>() {
return cx.throw_error("blob_lengths must sum to the length of blobs");
}

let mut blob_offset: usize = 0;
let blobs: Vec<Option<Vec<u8>>> = blob_lengths
.iter()
.map(|&len| {
if len == 0 {
return None;
}
let blob = blobs[blob_offset..blob_offset + len].to_vec();
blob_offset += len;
Some(blob)
})
.collect();

let jobs: Vec<JobInit> = jobs
.into_iter()
.zip(blobs)
.map(|(job, blob)| job.to_job_init(blob))
.collect();

let (deferred, promise) = cx.promise();
let channel = cx.channel();
let runtime = runtime(&mut cx)?;

let fut = async move {
let manager = match MANAGER.get() {
Some(manager) => manager,
None => {
deferred.settle_with(&channel, |mut cx| {
throw_null_err(&mut cx, "manager not initialized")
});
return;
}
};

let res = manager.bulk_create_jobs(jobs).await;
deferred.settle_with(&channel, move |mut cx| {
res.or_else(|e| cx.throw_error(format!("{}", e)))?;
Ok(cx.null())
});
};

runtime.spawn(fut);

Ok(promise)
}

fn dequeue_jobs(mut cx: FunctionContext) -> JsResult<JsPromise> {
let queue_name = cx.argument::<JsString>(0)?.value(&mut cx);

Expand Down Expand Up @@ -645,6 +708,22 @@ fn jobs_to_js_array<'a>(cx: &mut TaskContext<'a>, jobs: Vec<Job>) -> JsResult<'a
Ok(js_array)
}

impl JsJob {
fn to_job_init(&self, blob: Option<Vec<u8>>) -> JobInit {
JobInit {
team_id: self.team_id,
queue_name: self.queue_name.clone(),
priority: self.priority,
scheduled: self.scheduled,
function_id: self.function_id,
vm_state: self.vm_state.as_ref().map(|s| s.as_bytes().to_vec()),
parameters: self.parameters.as_ref().map(|s| s.as_bytes().to_vec()),
metadata: self.metadata.as_ref().map(|s| s.as_bytes().to_vec()),
blob,
}
}
}

#[neon::main]
fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("hello", hello)?;
Expand All @@ -653,6 +732,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("maybeInitWorker", maybe_init_worker)?;
cx.export_function("maybeInitManager", maybe_init_manager)?;
cx.export_function("createJob", create_job)?;
cx.export_function("bulkCreateJobs", bulk_create_jobs)?;
cx.export_function("dequeueJobs", dequeue_jobs)?;
cx.export_function("dequeueJobsWithVmState", dequeue_with_vm_state)?;
cx.export_function("releaseJob", release_job)?;
Expand Down
42 changes: 41 additions & 1 deletion rust/cyclotron-node/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,46 @@ export class CyclotronManager {
}

const json = JSON.stringify(jobInitInternal)
return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined)
return await cyclotron.createJob(json, job.blob ? job.blob : undefined)
}

async bulkCreateJobs(jobs: CyclotronJobInit[]): Promise<void> {
const jobInitsInternal = jobs.map((job) => {
job.priority ??= 1
job.scheduled ??= new Date()

return {
team_id: job.teamId,
function_id: job.functionId,
queue_name: job.queueName,
priority: job.priority,
scheduled: job.scheduled,
vm_state: job.vmState ? serializeObject('vmState', job.vmState) : null,
parameters: job.parameters ? serializeObject('parameters', job.parameters) : null,
metadata: job.metadata ? serializeObject('metadata', job.metadata) : null,
}
})
const json = JSON.stringify(jobInitsInternal)

const totalBytes = jobs.reduce((total, job) => total + (job.blob ? job.blob.byteLength : 0), 0)

// The cyclotron API expects a single buffer with all the blobs concatenated, and an array of lengths.
// 0 lengths indicate that the job has no blob.
const blobs = new Uint8Array(totalBytes)
const blobLengths = new Uint32Array(jobs.length)

let offset = 0;
for (let i = 0; i < jobs.length; i++) {
let blob = jobs[i].blob
if (blob) {
blobLengths[i] = blob.byteLength
blobs.set(blob, offset)
offset += blob.byteLength
} else {
blobLengths[i] = 0
}
}

return await cyclotron.bulkCreateJobs(json, blobs, blobLengths)
}
}

0 comments on commit 62ac774

Please sign in to comment.