Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Troy/image processor ext #205

Merged
merged 21 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,376 changes: 934 additions & 2,442 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 22 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
[workspace]

members = [
"platform/api",
"platform/image_processor",
"video/edge",
"video/ingest",
"video/transcoder",
"video/lib/*",
"video/api",
"video/player",
"video/player_types",
"video/common",
"video/cli",
"binary-helper",
"utils",
"proto",
"config",
"config/derive",
# "platform/api",
"image-processor",
"image-processor/proto",
# "video/edge",
# "video/ingest",
# "video/transcoder",
# "video/lib/*",
# "video/api",
# "video/player",
# "video/player_types",
# "video/common",
# "video/cli",
# "binary-helper",
# "utils",
# "proto",
# "config",
# "config/derive",
"ffmpeg",
"foundations",
"foundations/macros",
Expand Down Expand Up @@ -52,7 +53,7 @@ h265 = { path = "video/lib/h265" }
mp4 = { path = "video/lib/mp4" }
rtmp = { path = "video/lib/rtmp" }
transmuxer = { path = "video/lib/transmuxer" }
utils = { path = "utils", default-features = false, package = "scuffle-utils" }
scuffle-utils = { path = "utils", default-features = false }
config = { path = "config", package = "scuffle-config" }
pb = { path = "proto" }
video-common = { path = "video/common" }
Expand All @@ -62,12 +63,12 @@ video-edge = { path = "video/edge" }
video-ingest = { path = "video/ingest" }
video-transcoder = { path = "video/transcoder" }
binary-helper = { path = "binary-helper" }
ffmpeg = { path = "ffmpeg" }
scuffle-ffmpeg = { path = "ffmpeg" }

# These patches are pending PRs to the upstream crates
# TODO: Remove these once the PRs are merged
[patch.crates-io]
# https://github.com/remkop22/postgres-from-row/pull/9
postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" }
# https://github.com/madonoharu/tsify/pull/32
tsify = { git = "https://github.com/ScuffleTV/tsify.git", branch = "sisou/comments" }
# postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" }
# # https://github.com/madonoharu/tsify/pull/32
# tsify = { git = "https://github.com/ScuffleTV/tsify.git", branch = "sisou/comments" }
11 changes: 6 additions & 5 deletions binary-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ tracing = "0.1"
thiserror = "1.0"
tokio = { version = "1.36", features = ["full"] }
serde = { version = "1.0.1", features = ["derive"] }
async-nats = "0.33"
async-nats = "0.34"
ulid = "1.1"
async-trait = "0.1"
tonic = { version = "0.11", features = ["tls"] }
anyhow = "1.0"
tower-layer = "0.3"
async-stream = "0.3"
futures-util = "0.3"
rustls = "0.22"
rustls = "0.23"
rustls-pemfile = "2.0"
fred = { version = "8.0.0", features = ["enable-rustls", "sentinel-client", "dns"] }
tokio-postgres-rustls = "0.11"
tokio-postgres-rustls = "0.12"
tracing-subscriber = { features = ["env-filter", "fmt", "json"], version = "0.3" }
once_cell = "1.19"
aws-config = { version = "1.1" }
Expand All @@ -31,13 +31,14 @@ http-body = { version = "1.0.0"}
hyper = "1"
bytes = "1.0"
pin-project = "1"
tokio-rustls = "0.25"

tokio-postgres = { version = "0.7" }
postgres-types = { version = "0.2", features = ["with-serde_json-1", "with-chrono-0_4", "derive"] }
deadpool-postgres = { version = "0.12" }
deadpool-postgres = { version = "0.13" }
postgres-from-row = { version = "0.5" }
prost = { version = "0.12" }

config = { workspace = true }
utils = { workspace = true, features = ["all"] }
scuffle-utils = { workspace = true, features = ["all"] }
pb = { workspace = true }
38 changes: 20 additions & 18 deletions binary-helper/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use fred::interfaces::ClientLike;
use fred::types::ServerConfig;
use hyper::StatusCode;
use rustls::RootCertStore;
use utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use utils::database::tokio_postgres::NoTls;
use utils::database::Pool;
use utils::http::RouteError;
use scuffle_utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use scuffle_utils::database::tokio_postgres::NoTls;
use scuffle_utils::database::Pool;
use scuffle_utils::http::RouteError;

use crate::config::{DatabaseConfig, NatsConfig, RedisConfig};

Expand Down Expand Up @@ -40,7 +40,7 @@ macro_rules! impl_global_traits {

impl binary_helper::global::GlobalDb for $struct {
#[inline(always)]
fn db(&self) -> &Arc<utils::database::Pool> {
fn db(&self) -> &Arc<scuffle_utils::database::Pool> {
&self.db
}
}
Expand All @@ -50,7 +50,7 @@ macro_rules! impl_global_traits {
}

pub trait GlobalCtx {
fn ctx(&self) -> &utils::context::Context;
fn ctx(&self) -> &scuffle_utils::context::Context;
}

pub trait GlobalConfig {
Expand Down Expand Up @@ -124,16 +124,16 @@ pub async fn setup_nats(
Ok((nats, jetstream))
}

pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils::database::Pool>> {
pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<scuffle_utils::database::Pool>> {
let mut pg_config = config
.uri
.parse::<utils::database::tokio_postgres::Config>()
.parse::<scuffle_utils::database::tokio_postgres::Config>()
.context("invalid database uri")?;

pg_config.ssl_mode(if config.tls.is_some() {
utils::database::tokio_postgres::config::SslMode::Require
scuffle_utils::database::tokio_postgres::config::SslMode::Require
} else {
utils::database::tokio_postgres::config::SslMode::Disable
scuffle_utils::database::tokio_postgres::config::SslMode::Disable
});

let manager = if let Some(tls) = &config.tls {
Expand Down Expand Up @@ -164,15 +164,15 @@ pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?;

utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
tokio_postgres_rustls::MakeRustlsConnect::new(tls),
ManagerConfig {
recycling_method: RecyclingMethod::Fast,
},
)
} else {
utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
NoTls,
ManagerConfig {
Expand Down Expand Up @@ -230,7 +230,7 @@ pub async fn setup_redis(config: &RedisConfig) -> anyhow::Result<Arc<fred::clien

let certs = rustls_pemfile::certs(&mut io::BufReader::new(io::Cursor::new(cert))).collect::<Result<Vec<_>, _>>()?;

let mut cert_store = RootCertStore::empty();
let mut cert_store = tokio_rustls::rustls::RootCertStore::empty();
if let Some(ca_cert) = &tls.ca_cert {
let ca_cert = tokio::fs::read(ca_cert).await.context("failed to read redis ca cert")?;
let ca_certs =
Expand All @@ -240,11 +240,13 @@ pub async fn setup_redis(config: &RedisConfig) -> anyhow::Result<Arc<fred::clien
}
}

Some(fred::types::TlsConfig::from(fred::types::TlsConnector::from(
rustls::ClientConfig::builder()
.with_root_certificates(cert_store)
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?,
Some(fred::types::TlsConfig::from(fred::types::TlsConnector::Rustls(
tokio_rustls::TlsConnector::from(Arc::new(
tokio_rustls::rustls::ClientConfig::builder()
.with_root_certificates(cert_store)
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?,
)),
)))
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions binary-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context as _;
use scuffle_utils::context::Context;
use scuffle_utils::signal;
use tokio::signal::unix::SignalKind;
use tokio::{select, time};
use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
pub use traits::{Config, Global};
use utils::context::Context;
use utils::signal;

use self::config::GrpcConfig;

Expand Down
2 changes: 1 addition & 1 deletion binary-helper/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use utils::context::Context;
use scuffle_utils::context::Context;

pub trait Config {
fn parse() -> anyhow::Result<Self>
Expand Down
37 changes: 11 additions & 26 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ version: "3.1"
name: "db-scuffle-dev"

services:
cockroach:
image: ghcr.io/scuffletv/ci/cockroach:latest
mongo:
image: mongo:latest
pull_policy: "always"
command: start-single-node --insecure --advertise-addr=0.0.0.0
volumes:
- cockroach:/cockroach/cockroach-data
ports:
- "127.0.0.1:5432:26257"
- "127.0.0.1:8080:8080"
- "27111:27017"
volumes:
- mongo:/data/db

nats:
image: ghcr.io/scuffletv/ci/nats:latest
Expand All @@ -33,8 +31,8 @@ services:
- "127.0.0.1:9000:9000"
- "127.0.0.1:9001:9001"
environment:
- "MINIO_ACCESS_KEY=root"
- "MINIO_SECRET_KEY=scuffle123"
- "MINIO_ACCESS_KEY=minioadmin"
- "MINIO_SECRET_KEY=minioadmin"
volumes:
- minio:/data
command: server /data --console-address ":9001"
Expand All @@ -47,26 +45,13 @@ services:
entrypoint: >
/bin/sh -c "
set -eux;
/usr/bin/mc config host add myminio http://minio:9000 root scuffle123;
/usr/bin/mc rb --force myminio/scuffle-video || true;
/usr/bin/mc rb --force myminio/scuffle-image-processor || true;
/usr/bin/mc rb --force myminio/scuffle-image-processor-public || true;
/usr/bin/mc mb myminio/scuffle-video;
/usr/bin/mc mb myminio/scuffle-image-processor;
/usr/bin/mc mb myminio/scuffle-image-processor-public;
/usr/bin/mc anonymous set download myminio/scuffle-video;
/usr/bin/mc anonymous set download myminio/scuffle-image-processor-public;
/usr/bin/mc config host add myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/image-processor;
/usr/bin/mc anonymous set download myminio/image-processor;
exit 0;
"

redis:
image: ghcr.io/scuffletv/ci/redis:latest
pull_policy: "always"
ports:
- "127.0.0.1:6379:6379"

volumes:
cockroach:
nats:
minio:
redis:
mongo:
6 changes: 2 additions & 4 deletions ffmpeg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
[package]
name = "ffmpeg"
name = "scuffle-ffmpeg"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"

[dependencies]
ffmpeg-sys-next = "6.1"
ffmpeg-sys-next = "7"
libc = "0.2"
bytes = { optional = true, version = "1" }
tokio = { optional = true, version = "1" }
crossbeam-channel = { optional = true, version = "0.5" }
tracing = { optional = true, version = "0.1" }
utils = { workspace = true, optional = true }

[features]
default = []
task-abort = ["dep:utils"]
channel = ["dep:bytes"]
tokio-channel = ["channel", "dep:tokio"]
crossbeam-channel = ["channel", "dep:crossbeam-channel"]
Expand Down
9 changes: 0 additions & 9 deletions ffmpeg/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ impl GenericDecoder {
}

pub fn send_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

// Safety: `packet` is a valid pointer, and `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), packet.as_ptr()) };

Expand All @@ -165,9 +162,6 @@ impl GenericDecoder {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

// Safety: `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), std::ptr::null()) };

Expand All @@ -178,9 +172,6 @@ impl GenericDecoder {
}

pub fn receive_frame(&mut self) -> Result<Option<VideoFrame>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

let mut frame = Frame::new()?;

// Safety: `frame` is a valid pointer, and `self.decoder` is a valid pointer.
Expand Down
15 changes: 0 additions & 15 deletions ffmpeg/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,6 @@ impl Encoder {
outgoing_time_base: AVRational,
settings: impl Into<EncoderSettings>,
) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

if codec.as_ptr().is_null() {
return Err(FfmpegError::NoEncoder);
}
Expand Down Expand Up @@ -489,9 +486,6 @@ impl Encoder {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

// Safety: `self.encoder` is a valid pointer.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), std::ptr::null()) };
if ret == 0 {
Expand All @@ -502,9 +496,6 @@ impl Encoder {
}

pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

// Safety: `self.encoder` and `frame` are valid pointers.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), frame.as_ptr()) };
if ret == 0 {
Expand All @@ -515,9 +506,6 @@ impl Encoder {
}

pub fn receive_packet(&mut self) -> Result<Option<Packet>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

let mut packet = Packet::new()?;

const AVERROR_EAGAIN: i32 = AVERROR(EAGAIN);
Expand Down Expand Up @@ -631,9 +619,6 @@ impl<T: Send + Sync> MuxerEncoder<T> {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

self.encoder.send_eof()?;
self.handle_packets()?;

Expand Down
Loading
Loading