Skip to content

Commit

Permalink
Add support for UDFs in visualize output
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 2, 2024
1 parent 347c2f7 commit 6461c6e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
4 changes: 2 additions & 2 deletions crates/arroyo-worker/src/arrow/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ArrowOperator for AsyncUdfOperator {
"input_exprs",
self.input_exprs
.iter()
.map(|e| format!("{}", e))
.map(|e| format!("{:?}", e))
.collect::<Vec<_>>()
.join(", ")
.into(),
Expand All @@ -134,7 +134,7 @@ impl ArrowOperator for AsyncUdfOperator {
"final_exprs",
self.final_exprs
.iter()
.map(|e| format!("{}", e))
.map(|e| format!("{:?}", e))
.collect::<Vec<_>>()
.join(", ")
.into(),
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl WorkerGrpc for WorkerServer {
let logical = LogicalProgram::try_from(req.program.expect("Program is None"))
.expect("Failed to create LogicalProgram");

if let Ok(v) = to_d2(&logical) {
if let Ok(v) = to_d2(&logical).await {
debug!("Starting execution for graph\n{}", v);
}

Expand Down
17 changes: 10 additions & 7 deletions crates/arroyo-worker/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::engine::construct_operator;
use anyhow::bail;
use arrow_schema::Schema;
use arroyo_datastream::logical::LogicalProgram;
use arroyo_df::physical::new_registry;
Expand All @@ -14,15 +13,19 @@ fn format_arrow_schema_fields(schema: &Schema) -> Vec<(String, String)> {
.collect()
}

pub fn to_d2(logical: &LogicalProgram) -> anyhow::Result<String> {
let registry = Arc::new(new_registry());
pub async fn to_d2(logical: &LogicalProgram) -> anyhow::Result<String> {
let mut registry = new_registry();

if !logical.program_config.udf_dylibs.is_empty()
|| !logical.program_config.python_udfs.is_empty()
{
bail!("UDFs are not yet supported in the pipeline visualizer");
for (name, udf) in &logical.program_config.udf_dylibs {
registry.load_dylib(name, udf).await?;
}

for udf in logical.program_config.python_udfs.values() {
registry.add_python_udf(udf).await?;
}

let registry = Arc::new(registry);

let mut d2 = String::new();

for idx in logical.graph.node_indices() {
Expand Down
6 changes: 4 additions & 2 deletions crates/arroyo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,11 @@ async fn visualize(query: Input, open: bool) {
.await
.expect("Failed while planning query");

let d2 = utils::to_d2(&compiled.program).await.unwrap();

if open {
let tmp = temp_dir().join("plan.d2");
tokio::fs::write(&tmp, utils::to_d2(&compiled.program).unwrap())
tokio::fs::write(&tmp, d2)
.await
.expect("Failed to write plan");
let output = tmp.with_extension("svg");
Expand All @@ -506,6 +508,6 @@ async fn visualize(query: Input, open: bool) {

let _ = open::that(format!("file://{}", output.to_string_lossy()));
} else {
println!("{}", utils::to_d2(&compiled.program).unwrap());
println!("{}", d2);
}
}

0 comments on commit 6461c6e

Please sign in to comment.