Skip to content

Commit

Permalink
refactor: use oneshot to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kawaemon authored and nanai10a committed Jun 19, 2024
1 parent 01beb0d commit 9b7c673
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 140 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.8"
clap = { version = "4", features = ["derive"] }
crossbeam = "0.8"
derivative = "2"
dotenv = "0.15"
hex = "0.4"
Expand Down
4 changes: 2 additions & 2 deletions src/bot/genkai_point/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
chrono::{DateTime, Duration, Utc},
clap::ValueEnum,
once_cell::sync::Lazy,
std::{cmp::Ordering, collections::HashMap, fmt::Write},
std::{cmp::Ordering, collections::HashMap, fmt::Write, future::Future},
tokio::sync::Mutex,
};

Expand Down Expand Up @@ -160,7 +160,7 @@ pub(crate) trait GenkaiPointDatabase: Send + Sync {
}

pub(crate) trait Plotter: Send + Sync + 'static {
fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>>;
fn plot(&self, data: Vec<(String, Vec<f64>)>) -> impl Future<Output = Result<Vec<u8>>> + Send;
}

#[derive(Debug)]
Expand Down
193 changes: 66 additions & 127 deletions src/bot/genkai_point/plot/charming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,25 @@ use {
component::Axis, element::name_location::NameLocation, series::Line, Chart, ImageFormat,
ImageRenderer,
},
std::{
sync::{Arc, Condvar, Mutex},
thread::{spawn, JoinHandle},
time::Instant,
},
crossbeam::channel::{Receiver, Sender},
std::thread,
tokio::sync::oneshot,
};

pub(crate) struct Charming {
renderer_handle: RendererHandle,
renderer: Renderer,
}

impl Charming {
pub(crate) fn new() -> Self {
let renderer_handle = spawn_renderer();
let renderer = Renderer::spawn();

Self { renderer_handle }
Self { renderer }
}
}

impl Plotter for Charming {
fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
async fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
let chart = data
.iter()
.fold(Chart::new(), |chart, (label, data)| {
Expand All @@ -43,145 +41,86 @@ impl Plotter for Charming {
.name("累計VC時間(時)"),
);

self.renderer_handle.call(chart)
self.renderer.render(chart).await
}
}

struct RendererHandle {
port: Arc<Mutex<Job>>,
cond: Arc<Condvar>,

_handle: JoinHandle<()>,
struct Request {
data: Chart,
bell: oneshot::Sender<Response>,
}

impl RendererHandle {
fn call(&self, chart: Chart) -> Result<Vec<u8>> {
let mut guard = self
.port
.lock()
.map_err(|_| anyhow!("failed to lock job queue"))?;

let chart = Unconsidered::wrap(chart);
let session = Session::new();

let Job::Idle = std::mem::replace(&mut *guard, Job::Queued(chart, session)) else {
unreachable!()
};

self.cond.notify_all();

let mut guard = self
.cond
.wait_while(
guard,
|job| !matches!(job, Job::Finished(_, s) if session == *s),
)
.map_err(|_| anyhow!("failed to lock job queue"))?;

let Job::Finished(result, _) = std::mem::replace(&mut *guard, Job::Idle) else {
dbg!(&*guard);
unreachable!()
};

result.unwrap()
}
struct Response {
image: Result<Vec<u8>>,
}

fn spawn_renderer() -> RendererHandle {
let port = Arc::new(Mutex::new(Job::Idle));
let cond = Arc::new(Condvar::new());

let _handle = {
let port = port.clone();
let cond = cond.clone();

spawn(|| renderer_main(port, cond))
};

RendererHandle {
port,
cond,
_handle,
}
struct Renderer {
tx: Sender<Request>,
_thread_handle: thread::JoinHandle<()>,
}

fn renderer_main(port: Arc<Mutex<Job>>, cond: Arc<Condvar>) {
let mut renderer = ImageRenderer::new(1280, 720);

while let Ok(Ok(mut job)) = port
.lock()
.map(|guard| cond.wait_while(guard, |job| !matches!(job, Job::Queued(..))))
{
let Job::Queued(arg0, session) = std::mem::replace(&mut *job, Job::Running) else {
unreachable!()
};
impl Renderer {
fn render_thread(rx: Receiver<Request>) {
let mut renderer = ImageRenderer::new(1280, 720);

let arg0 = arg0.unwrap();
let ret = renderer
.render_format(ImageFormat::Png, &arg0)
.map_err(|_| anyhow!("no detail provided"));
for req in rx {
let image = renderer
.render_format(ImageFormat::Png, &req.data)
.map_err(|e| anyhow!("charming error: {e:#?}"));

let ret = Unconsidered::wrap(ret);
let Job::Running = std::mem::replace(&mut *job, Job::Finished(ret, session)) else {
unreachable!()
};

cond.notify_all();
req.bell.send(Response { image }).ok();
}
}
}

#[derive(Debug, PartialEq, Eq)]
enum Job {
Idle,
Queued(Unconsidered<Chart>, Session),
Running,
Finished(Unconsidered<Result<Vec<u8>>>, Session),
}
fn spawn() -> Self {
let (tx, rx) = crossbeam::channel::unbounded::<Request>();

struct Unconsidered<T>(T);
let handle = std::thread::spawn(|| Self::render_thread(rx));

impl<T> Unconsidered<T> {
fn wrap(val: T) -> Self {
Self(val)
Self {
tx,
_thread_handle: handle,
}
}

fn unwrap(self) -> T {
self.0
}
}
async fn render(&self, data: Chart) -> Result<Vec<u8>> {
let (tx, rx) = oneshot::channel();

impl<T> std::fmt::Debug for Unconsidered<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{unconsidered}}")
}
}
self.tx.send(Request { data, bell: tx }).unwrap();

impl<T> PartialEq for Unconsidered<T> {
fn eq(&self, _: &Self) -> bool {
true
rx.await.unwrap().image
}
}

impl<T> Eq for Unconsidered<T> {}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Session(Instant);

impl Session {
fn new() -> Self {
Self(Instant::now())
#[tokio::test]
async fn test() {
let charming = std::sync::Arc::new(Charming::new());

let mut handles = vec![];

#[allow(unused_variables)]
for i in 0..10 {
let charming = charming.clone();

handles.push(tokio::spawn(async move {
let result = charming
.plot(vec![
("kawaemon".into(), vec![1.0, 4.0, 6.0, 7.0]),
("kawak".into(), vec![2.0, 5.0, 11.0, 14.0]),
])
.await
.unwrap();

// should we assert_eq with actual png?
assert_ne!(result.len(), 0);

// uncomment this to see image artifacts
// tokio::fs::write(format!("./out{i}.png"), result)
// .await
// .unwrap();
}));
}
}

#[test]
fn test() {
let result = Charming::new()
.plot(vec![
("kawaemon".into(), vec![1.0, 4.0, 6.0, 7.0]),
("kawak".into(), vec![2.0, 5.0, 11.0, 14.0]),
])
.unwrap();

// should we assert_eq with actual png?
assert_ne!(result.len(), 0);
for h in handles {
h.await.unwrap();
}
}
16 changes: 9 additions & 7 deletions src/bot/genkai_point/plot/matplotlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Matplotlib {
}

impl Plotter for Matplotlib {
fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
async fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
let result: Result<PythonContext, _> = std::panic::catch_unwind(|| {
python! {
import io
Expand All @@ -47,12 +47,14 @@ impl Plotter for Matplotlib {
}
}

#[test]
fn test_plot_to_image() {
let result = Matplotlib.plot(vec![
("kawaemon".into(), vec![1.0, 4.0, 6.0, 7.0]),
("kawak".into(), vec![2.0, 5.0, 11.0, 14.0]),
]);
#[tokio::test]
async fn test_plot_to_image() {
let result = Matplotlib {}
.plot(vec![
("kawaemon".into(), vec![1.0, 4.0, 6.0, 7.0]),
("kawak".into(), vec![2.0, 5.0, 11.0, 14.0]),
])
.await;

// should we assert_eq with actual png?
assert_ne!(result.unwrap().len(), 0);
Expand Down
1 change: 1 addition & 0 deletions src/bot/genkai_point/plot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(super) async fn plot<P: Plotter + Send>(
// use tokio::task::spawn_blocking to solve this problem.
let image = plotter
.plot(prottable_data)
.await
.context("failed to plot graph")?;

Ok(Some(image))
Expand Down
7 changes: 4 additions & 3 deletions src/bot/genkai_point/plot/plotters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Plotters {
}

impl Plotter for Plotters {
fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
async fn plot(&self, data: Vec<(String, Vec<f64>)>) -> Result<Vec<u8>> {
const SIZE: (usize, usize) = (1280, 720);

let mut buffer = vec![0; SIZE.0 * SIZE.1 * 3];
Expand Down Expand Up @@ -108,13 +108,14 @@ impl Plotter for Plotters {
}
}

#[test]
fn test() {
#[tokio::test]
async fn test() {
let result = Plotters::new()
.plot(vec![
("kawaemon".into(), vec![1.0, 4.0, 6.0, 7.0]),
("kawak".into(), vec![2.0, 5.0, 11.0, 14.0]),
])
.await
.unwrap();

// should we assert_eq with actual png?
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn async_main() -> Result<()> {
#[cfg(feature = "plot_plotters")]
let plotter = plot::plotters::Plotters::new();
#[cfg(feature = "plot_matplotlib")]
let plotter = plot::plotters::Matplotlib::new();
let plotter = plot::matplotlib::Matplotlib::new();
#[cfg(feature = "plot_charming")]
let plotter = plot::charming::Charming::new();

Expand Down

0 comments on commit 9b7c673

Please sign in to comment.