From 4f95efec12fe52f2f775ed5c1f4de8533d616a24 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 13 Jan 2024 00:29:32 +0100 Subject: [PATCH] fix: publish to nats instead of jetstream --- platform/image_processor/src/processor/error.rs | 2 +- platform/image_processor/src/processor/job/mod.rs | 4 ++-- video/api/src/api/events/ack.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/platform/image_processor/src/processor/error.rs b/platform/image_processor/src/processor/error.rs index 6107aa68..1184e291 100644 --- a/platform/image_processor/src/processor/error.rs +++ b/platform/image_processor/src/processor/error.rs @@ -33,7 +33,7 @@ pub enum ProcessorError { S3Upload(s3::error::S3Error), #[error("publish to nats: {0}")] - NatsPublish(#[from] async_nats::jetstream::context::PublishError), + NatsPublish(#[from] async_nats::PublishError), #[error("image: {0}")] FileFormat(std::io::Error), diff --git a/platform/image_processor/src/processor/job/mod.rs b/platform/image_processor/src/processor/job/mod.rs index d271b2a9..c04d70ea 100644 --- a/platform/image_processor/src/processor/job/mod.rs +++ b/platform/image_processor/src/processor/job/mod.rs @@ -66,7 +66,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { tracing::warn!(err = %e, "job failed"); tracing::debug!("publishing job failure event to {}", self.job.task.callback_subject); self.global - .jetstream() + .nats() .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { @@ -167,7 +167,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { // job completion tracing::debug!("publishing job completion event to {}", self.job.task.callback_subject); self.global - .jetstream() + .nats() .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { diff --git a/video/api/src/api/events/ack.rs b/video/api/src/api/events/ack.rs index e880f813..c74833e1 100644 --- a/video/api/src/api/events/ack.rs +++ b/video/api/src/api/events/ack.rs @@ -62,7 +62,7 @@ impl ApiRequest for tonic::Request { None => return Err(tonic::Status::invalid_argument("missing action")), }; - global.jetstream().publish(reply, ack_kind.into()).await.map_err(|err| { + global.nats().publish(reply, ack_kind.into()).await.map_err(|err| { tracing::error!(err = %err, "failed to publish ack"); tonic::Status::internal("failed to publish ack") })?;