From ce0c9a9d14f3823ddda2406bd636846fcbe27ec0 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Thu, 28 Nov 2024 14:00:43 +0530 Subject: [PATCH] Remove introspection at the startup of the connector (#5) * fix databricks * code refactors * Fix conflicts * fix the warnings * no introspection while the connector runs * added conditional logic to use models from config where available in delivering schema * clean up: removed debug lines and some commented out code * fix more warnings * use --release in the calcite dockerfile * fix the initializing logic of the otel stuff * fix the github workflow * revert databricks change * remove debug statements * fix warnings * do not intialize otel during introspection * fix warnings and README * update README * fix trace attribute * fix the rust-anaylzer.toml --------- Co-authored-by: Jonathan Weiss --- .dir-locals.el | 1 + .github/workflows/docker-image.yml | 12 +-- Cargo.lock | 54 ++---------- Cargo.toml | 4 +- README.md | 2 +- build-local.sh | 56 +++++++++--- calcite-rs-jni/calcite | 2 +- calcite-rs-jni/jni/pom.xml | 9 -- .../main/java/com/hasura/CalciteQuery.java | 2 +- crates/calcite-schema/Cargo.toml | 1 + crates/calcite-schema/src/jvm.rs | 56 ++++++------ crates/calcite-schema/src/lib.rs | 1 - crates/calcite-schema/src/metrics.rs | 70 --------------- crates/calcite-schema/src/models.rs | 2 +- crates/calcite-schema/src/schema.rs | 58 +++---------- crates/calcite-schema/src/test.rs | 86 ++++++++++--------- crates/calcite-schema/src/version5.rs | 60 ++++++------- crates/cli/Cargo.toml | 1 + crates/cli/src/lib.rs | 16 ++-- crates/cli/src/main.rs | 2 +- crates/cli/tests/initialize_tests.rs | 20 ++++- crates/connectors/ndc-calcite/src/calcite.rs | 18 ++-- .../ndc-calcite/src/connector/calcite.rs | 43 +++------- crates/connectors/ndc-calcite/src/query.rs | 39 ++++----- crates/connectors/ndc-calcite/src/sql.rs | 4 +- ndc-calcite.md | 6 +- run-connector-local.sh | 2 +- run-connector-update.sh | 5 ++ rust-analyzer.toml | 16 ++++ 29 files changed, 266 insertions(+), 382 deletions(-) create mode 100644 .dir-locals.el delete mode 100644 crates/calcite-schema/src/metrics.rs create mode 100755 run-connector-update.sh create mode 100644 rust-analyzer.toml diff --git a/.dir-locals.el b/.dir-locals.el new file mode 100644 index 0000000..da511fe --- /dev/null +++ b/.dir-locals.el @@ -0,0 +1 @@ +((nil . ((lsp-file-watch-ignored-directories . ["target" "calcite-rs-jni" "grafana" "scripts" "shared" "[/\\\\]\\.git$" "[/\\\\]\\.hg$" "[/\\\\]\\.svn$"])))) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index bfcff42..ac2271d 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -2,25 +2,21 @@ name: Docker Image CI on: push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] + tags: + - 'v*' # This will match any tag that starts with 'v' jobs: - build: - runs-on: ubuntu-latest - steps: + steps: - uses: actions/checkout@v3 - uses: docker/setup-qemu-action@v3 - uses: docker/setup-buildx-action@v3 - uses: docker/build-push-action@v5 with: context: . - tags: ghcr.io/hasura/ndc-calcite:latest + tags: ghcr.io/hasura/ndc-calcite:${{ github.ref_name }} # Use the tag name for the Docker image platforms: linux/amd64,linux/arm64 cache-from: type=gha cache-to: type=gha,mode=max - diff --git a/Cargo.lock b/Cargo.lock index 656ca06..944fc9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,8 +200,6 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", - "hyper-util", "itoa", "matchit", "memchr", @@ -210,15 +208,10 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "serde_json", - "serde_path_to_error", - "serde_urlencoded", "sync_wrapper 1.0.1", - "tokio", "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -304,15 +297,15 @@ dependencies = [ [[package]] name = "axum-test-helper" -version = "0.4.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf248c9cf5176a9cdd6c597ab21cbb0408e464b9865bf7202c7a31e8fb18b2a3" +checksum = "298f62fa902c2515c169ab0bfb56c593229f33faa01131215d58e3d4898e3aa9" dependencies = [ - "axum 0.7.5", + "axum 0.6.20", "bytes", - "http 1.1.0", + "http 0.2.12", "http-body 0.4.6", - "hyper 1.4.1", + "hyper 0.14.30", "reqwest 0.11.27", "serde", "tokio", @@ -1028,20 +1021,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.30", - "rustls 0.21.12", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1455,7 +1434,7 @@ version = "0.1.4" dependencies = [ "anyhow", "async-trait", - "axum 0.7.5", + "axum 0.6.20", "axum-extra 0.9.3", "axum-test-helper", "bytes", @@ -1499,6 +1478,7 @@ dependencies = [ "anyhow", "build-data", "clap", + "dotenv", "env_logger", "include_dir", "insta", @@ -1522,6 +1502,7 @@ version = "0.1.4" dependencies = [ "anyhow", "clap", + "dotenv", "http 0.2.12", "jni", "jsonschema", @@ -2308,7 +2289,6 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-rustls", "hyper-tls 0.5.0", "ipnet", "js-sys", @@ -2328,7 +2308,6 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -2336,7 +2315,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots", "winreg", ] @@ -3041,16 +3019,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.25.0" @@ -3583,12 +3551,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.25.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 1e9342c..c45e2e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,8 @@ ndc-test = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.6" } anyhow = "1" async-trait = "0.1" -axum = "0.7.5" -axum-test-helper = "0.4.0" +axum = "0.6" +axum-test-helper = "0.3.0" build-data = "0.2" bytes = "1" clap = "4" diff --git a/README.md b/README.md index 4fa1ac9..726e382 100644 --- a/README.md +++ b/README.md @@ -125,4 +125,4 @@ Different components are released under different licenses: For support options: - File issues on the respective GitHub repositories - Contact Hasura support at support@hasura.io for commercial support -- Join the community discussions on calcite.apache.org/community/ \ No newline at end of file +- Join the community discussions on calcite.apache.org/community/ diff --git a/build-local.sh b/build-local.sh index 3b0b44a..8cfde4d 100755 --- a/build-local.sh +++ b/build-local.sh @@ -1,14 +1,42 @@ -# prevents local build artifacts being added to image -cd calcite-rs-jni -mvn clean -cd calcite -./gradlew clean - - create a tag name from the last connector release -cd ../.. -docker build . -t ghcr.io/hasura/ndc-calcite:latest - -cd calcite-rs-jni/calcite -./gradlew assemble -cd .. -mvn install -DskipTests +#!/bin/bash + +# Check if version argument is provided +if [ $# -ne 1 ]; then + echo "Error: Version argument is required" + echo "Usage: $0 " + echo "Example: $0 1.0.0" + exit 1 +fi + +VERSION=$1 + +# Store the script's starting directory +INITIAL_DIR=$(pwd) + +# Function to handle errors +handle_error() { + echo "Error: $1" + cd "$INITIAL_DIR" # Return to initial directory + exit 1 +} + +# Clean Maven artifacts +cd calcite-rs-jni || handle_error "Failed to change directory to calcite-rs-jni" +mvn clean || handle_error "Maven clean failed" + +# Clean Gradle artifacts +cd calcite || handle_error "Failed to change directory to calcite" +./gradlew clean || handle_error "Gradle clean failed" + +# Build Docker image +cd ../.. || handle_error "Failed to return to root directory" +docker build . -t "ghcr.io/hasura/ndc-calcite:${VERSION}" || handle_error "Docker build failed" + +# Build and install Java artifacts +cd calcite-rs-jni/calcite || handle_error "Failed to change directory to calcite-rs-jni/calcite" +./gradlew assemble || handle_error "Gradle assemble failed" +cd .. || handle_error "Failed to change directory" +mvn install -DskipTests || handle_error "Maven install failed" + +echo "Build completed successfully!" +echo "Docker image tagged as: ghcr.io/hasura/ndc-calcite:${VERSION}" diff --git a/calcite-rs-jni/calcite b/calcite-rs-jni/calcite index d451385..ca4decf 160000 --- a/calcite-rs-jni/calcite +++ b/calcite-rs-jni/calcite @@ -1 +1 @@ -Subproject commit d4513853cb8352ef2d82fdbece4f3fadbff7434e +Subproject commit ca4decfd4aa9270c9a8ccdd0c3d90ac468999c09 diff --git a/calcite-rs-jni/jni/pom.xml b/calcite-rs-jni/jni/pom.xml index 88e5a74..a2bf5a4 100644 --- a/calcite-rs-jni/jni/pom.xml +++ b/calcite-rs-jni/jni/pom.xml @@ -194,13 +194,6 @@ - - sybase - jconnect - 1 - system - ${project.basedir}/jars/jconn4.jar - com.hasura bigquery-jdbc @@ -271,7 +264,6 @@ com.databricks databricks-jdbc 2.6.40-patch-1 - runtime org.apache.hive @@ -361,4 +353,3 @@ - diff --git a/calcite-rs-jni/jni/src/main/java/com/hasura/CalciteQuery.java b/calcite-rs-jni/jni/src/main/java/com/hasura/CalciteQuery.java index b290c6f..00905bc 100644 --- a/calcite-rs-jni/jni/src/main/java/com/hasura/CalciteQuery.java +++ b/calcite-rs-jni/jni/src/main/java/com/hasura/CalciteQuery.java @@ -459,7 +459,7 @@ public String queryModels(String query, String parentTraceId, String parentSpanI return result; } else { - span.setAttribute("Using JSON_OBJECT() method", false); + span.setAttribute("Not Using JSON_OBJECT() method", false); // Java's inbuilt DateTimeFormatter doesn't have any predefined format for RFC 3339 DateTimeFormatter rfcFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).withZone(ZoneId.of("UTC")); DateTimeFormatter rfcDateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ENGLISH).withZone(ZoneId.of("UTC")); diff --git a/crates/calcite-schema/Cargo.toml b/crates/calcite-schema/Cargo.toml index 5ffc023..3faecd3 100644 --- a/crates/calcite-schema/Cargo.toml +++ b/crates/calcite-schema/Cargo.toml @@ -25,6 +25,7 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } once_cell = { workspace = true} +dotenv = { workspace = true } http = { workspace = true } [dev-dependencies] diff --git a/crates/calcite-schema/src/jvm.rs b/crates/calcite-schema/src/jvm.rs index f27e898..e9d3a85 100644 --- a/crates/calcite-schema/src/jvm.rs +++ b/crates/calcite-schema/src/jvm.rs @@ -35,16 +35,17 @@ static CONFIG: OnceCell> = OnceCell::new(); /// ``` // ANCHOR: get_jvm #[tracing::instrument(skip(), level = Level::INFO)] -pub fn get_jvm() -> &'static Mutex { +pub fn get_jvm(should_initialize_otel: bool) -> &'static Mutex { { let jvm = JVM.get().expect("JVM is not set up."); let binding = jvm.lock().unwrap(); let mut env = binding.attach_current_thread().unwrap(); let _ = env.call_static_method("com/hasura/CalciteQuery", "noOpMethod", "()V", &[]); if let Err(_) = env.exception_occurred() { + dotenv::dotenv().ok(); env.exception_describe().expect("TODO: panic message"); env.exception_clear().expect("TODO: panic message"); - init_jvm(&CONFIG.get().as_ref().unwrap().lock().unwrap()); + init_jvm(&CONFIG.get().as_ref().unwrap().lock().unwrap(), should_initialize_otel); return JVM.get().expect("JVM problem."); } } @@ -71,13 +72,14 @@ pub fn get_jvm() -> &'static Mutex { /// ``` // ANCHOR: init_jvm #[tracing::instrument(skip(calcite_configuration), level = Level::INFO)] -pub fn init_jvm(calcite_configuration: &ParsedConfiguration) { +pub fn init_jvm(calcite_configuration: &ParsedConfiguration, should_initialize_otel: bool) { let configuration = match calcite_configuration { ParsedConfiguration::Version5(c) => c }; let state_inited = env::var("STATE_INITED").unwrap_or("false".to_string()); if state_inited == "false" { - let folder_path = env::var("JAR_DEPENDENCY_FOLDER").unwrap_or("/calcite-rs-jni/jni/target/dependency".into()); + let jar_dependency_folder = env::var("JAR_DEPENDENCY_FOLDER"); + let folder_path = jar_dependency_folder.unwrap_or("/calcite-rs-jni/jni/target/dependency".into()); let mut jar_paths = get_jar_files(&folder_path); let jar_name = env::var("CALCITE_JAR").unwrap_or("/calcite-rs-jni/jni/target/calcite-rs-jni-1.0-SNAPSHOT.jar".into()); @@ -94,78 +96,82 @@ pub fn init_jvm(calcite_configuration: &ParsedConfiguration) { } let log4j2_debug = env::var("LOG4J2_DEBUG").unwrap_or("false".to_string()); - let otel_exporter_otlp_traces_endpoint = env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").unwrap_or_default(); - let otel_exporter_otlp_metrics_endpoint = env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").unwrap_or_default(); - let otel_metric_export_interval = env::var("OTEL_METRIC_EXPORT_INTERVAL").unwrap_or_default(); - let otel_exporter_otlp_endpoint = env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or("http://local.hasura.dev:4317".to_string()); - let otel_service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("".to_string()); - let otel_logs_exporter = env::var("OTEL_LOGS_EXPORTER").unwrap_or("".to_string()); - let otel_traces_exporter = env::var("OTEL_TRACES_EXPORTER").unwrap_or("".to_string()); - let otel_metrics_exporter = env::var("OTEL_METRICS_EXPORTER").unwrap_or("".to_string()); - let otel_log_level = env::var("OTEL_LOG_LEVEL").unwrap_or("".to_string()); + let otel_exporter_otlp_traces_endpoint = env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"); + let otel_exporter_otlp_metrics_endpoint = env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"); + let otel_metric_export_interval = env::var("OTEL_METRIC_EXPORT_INTERVAL"); + let otel_exporter_otlp_endpoint = env::var("OTEL_EXPORTER_OTLP_ENDPOINT"); + let otel_service_name = env::var("OTEL_SERVICE_NAME"); + let otel_logs_exporter = env::var("OTEL_LOGS_EXPORTER"); + let otel_traces_exporter = env::var("OTEL_TRACES_EXPORTER"); + let otel_metrics_exporter = env::var("OTEL_METRICS_EXPORTER"); + let otel_log_level = env::var("OTEL_LOG_LEVEL"); let log_level = env::var("LOG_LEVEL").unwrap_or("".to_string()); let log4j_configuration_file = env::var("LOG4J_CONFIGURATION_FILE").unwrap_or("classpath:log4j2-config.xml".to_string()); let expanded_paths: String = jar_paths.join(":"); let mut jvm_args = InitArgsBuilder::new() .version(JNIVersion::V8) .option(format!("-Dlog4j2.debug={}", log4j2_debug)) - .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED") +// .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED") .option("-Dotel.java.global-autoconfigure.enabled=true") .option(format!("-Dlog4j.configurationFile={}", log4j_configuration_file)); - if !otel_exporter_otlp_traces_endpoint.is_empty() { + + if should_initialize_otel { + if let Ok(otel_exporter_otlp_traces_endpoint) = otel_exporter_otlp_traces_endpoint { jvm_args = jvm_args.option( format!("-Dotel.exporter.otlp.traces.endpoint={}", otel_exporter_otlp_traces_endpoint) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.exporter.otlp.traces.endpoint={}", otel_exporter_otlp_traces_endpoint)); } - if !otel_exporter_otlp_metrics_endpoint.is_empty() { + if let Ok(otel_exporter_otlp_metrics_endpoint) = otel_exporter_otlp_metrics_endpoint { jvm_args = jvm_args.option( format!("-Dotel.exporter.otlp.metrics.endpoint={}", otel_exporter_otlp_metrics_endpoint) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.exporter.otlp.metrics.endpoint={}", otel_exporter_otlp_metrics_endpoint)); } - if !otel_exporter_otlp_endpoint.is_empty() { + if let Ok(endpoint) = otel_exporter_otlp_endpoint { jvm_args = jvm_args.option( - format!("-Dotel.exporter.otlp.endpoint={}", otel_exporter_otlp_endpoint) + format!("-Dotel.exporter.otlp.endpoint={}", endpoint) ); - event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.exporter.otlp.endpoint={}", otel_exporter_otlp_endpoint)); + event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.exporter.otlp.endpoint={}", endpoint)); } - if !otel_service_name.is_empty() { + if let Ok(otel_service_name) = otel_service_name { jvm_args = jvm_args.option( format!("-Dotel.service.name={}", otel_service_name) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.service.name={}", otel_service_name)); } - if !otel_logs_exporter.is_empty() { + if let Ok(otel_logs_exporter) = otel_logs_exporter { jvm_args = jvm_args.option( format!("-Dotel.logs.exporter={}", otel_logs_exporter) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.logs.exporter={}", otel_logs_exporter)); } - if !otel_traces_exporter.is_empty() { + if let Ok(otel_traces_exporter) = otel_traces_exporter { jvm_args = jvm_args.option( format!("-Dotel.traces.exporter={}", otel_traces_exporter) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.traces.exporter={}", otel_traces_exporter)); } - if !otel_metrics_exporter.is_empty() { + if let Ok(otel_metrics_exporter) = otel_metrics_exporter { jvm_args = jvm_args.option( format!("-Dotel.metrics.exporter={}", otel_metrics_exporter) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.metrics.exporter={}", otel_metrics_exporter)); } - if !otel_metric_export_interval.is_empty() { + if let Ok(otel_metric_export_interval) = otel_metric_export_interval { jvm_args = jvm_args.option( format!("-Dotel.metric.export.interval={}", otel_metric_export_interval) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.metric.export.interval={}", otel_metric_export_interval)); } - if !otel_log_level.is_empty() { + if let Ok(otel_log_level) = otel_log_level { jvm_args = jvm_args.option( format!("-Dotel.log.level={}", otel_log_level) ); event!(Level::DEBUG, "Added {} to JVM", format!("-Dotel.log.level={}", otel_log_level)); } + + } if !log_level.is_empty() { jvm_args = jvm_args.option( format!("-DLOG_LEVEL={}", log_level) diff --git a/crates/calcite-schema/src/lib.rs b/crates/calcite-schema/src/lib.rs index 382834e..5d71904 100644 --- a/crates/calcite-schema/src/lib.rs +++ b/crates/calcite-schema/src/lib.rs @@ -14,6 +14,5 @@ pub mod jvm; pub mod configuration; pub mod version; -mod metrics; mod test; mod list_files; diff --git a/crates/calcite-schema/src/metrics.rs b/crates/calcite-schema/src/metrics.rs deleted file mode 100644 index 72353f7..0000000 --- a/crates/calcite-schema/src/metrics.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Metrics setup and update for our connector. - -use prometheus::{IntGauge, Registry}; - -use crate::version::VersionTag; - -/// The collection of configuration-related metrics exposed through the `/metrics` endpoint. -#[derive(Debug, Clone)] -pub struct Metrics { - configuration_version_3: IntGauge, - configuration_version_4: IntGauge, - configuration_version_5: IntGauge, -} - -impl Metrics { - /// Set up counters and gauges used to produce Prometheus metrics - pub fn initialize(metrics_registry: &mut Registry) -> Result { - let configuration_version_3 = add_int_gauge_metric( - metrics_registry, - "ndc_postgres_configuration_version_3", - "Get whether configuration version 3 is used", - )?; - - let configuration_version_4 = add_int_gauge_metric( - metrics_registry, - "ndc_postgres_configuration_version_4", - "Get whether configuration version 4 is used", - )?; - - let configuration_version_5 = add_int_gauge_metric( - metrics_registry, - "ndc_postgres_configuration_version_5", - "Get whether configuration version 5 is used", - )?; - - Ok(Self { - configuration_version_3, - configuration_version_4, - configuration_version_5, - }) - } - - /// Set the configuration version used by this connector instance. - pub fn set_configuration_version(&self, version: VersionTag) { - match version { - VersionTag::Version3 => self.configuration_version_3.set(1), - VersionTag::Version4 => self.configuration_version_4.set(1), - VersionTag::Version5 => self.configuration_version_5.set(1), - } - } -} - -/// Create a new int gauge metric and register it with the provided Prometheus Registry -fn add_int_gauge_metric( - metrics_registry: &mut Registry, - metric_name: &str, - metric_description: &str, -) -> Result { - let int_gauge = IntGauge::with_opts(prometheus::Opts::new(metric_name, metric_description))?; - register_collector(metrics_registry, int_gauge) -} - -/// Register a new collector with the registry, and returns it for later use. -fn register_collector( - metrics_registry: &mut Registry, - collector: Collector, -) -> Result { - metrics_registry.register(Box::new(collector.clone()))?; - Ok(collector) -} diff --git a/crates/calcite-schema/src/models.rs b/crates/calcite-schema/src/models.rs index 570272a..d081e3f 100644 --- a/crates/calcite-schema/src/models.rs +++ b/crates/calcite-schema/src/models.rs @@ -20,7 +20,7 @@ use crate::jvm::get_jvm; #[tracing::instrument(skip(calcite_ref), level=Level::INFO)] pub fn get_models(calcite_ref: &GlobalRef) -> HashMap { let map = { - let jvm = get_jvm().lock().unwrap(); + let jvm = get_jvm(false).lock().unwrap(); let mut env = jvm.attach_current_thread_as_daemon().unwrap(); let calcite_query = env.new_local_ref(calcite_ref).unwrap(); let args: &[JValueGen<&JObject<'_>>] = &[]; diff --git a/crates/calcite-schema/src/schema.rs b/crates/calcite-schema/src/schema.rs index e68583a..37616ae 100644 --- a/crates/calcite-schema/src/schema.rs +++ b/crates/calcite-schema/src/schema.rs @@ -1,37 +1,21 @@ //! # Get Schema //! -//! Introspects Calcite metadata and generates a new schema. Updates -//! the config file with the new schema. -//! +//! Introspects Calcite metadata and generates the NDC schema. -use std::fs::File; -use std::io::Write; -use std::path::Path; -use jni::objects::GlobalRef; use ndc_models as models; use ndc_models::SchemaResponse; -use ndc_sdk::connector::{ErrorResponse, Result}; -use tracing::{debug, event, Level}; -use ndc_calcite_values::is_running_in_container::is_running_in_container; -use ndc_calcite_values::values::{CONFIGURATION_FILENAME, DEV_CONFIG_FILE_NAME, DOCKER_CONNECTOR_RW}; +use ndc_sdk::connector::Result; +use tracing::Level; use crate::{collections, scalars}; -use crate::models::get_models; use crate::version5::ParsedConfiguration; -/// Get the schema information from the given `calcite_ref`. +/// Get the schema information from the metadata present in the configuration. /// -/// This function retrieves the data models using `calcite::get_models` function and the scalar types using `scalars::scalars` function. -/// It then calls `collections::collections` function with the data models and scalar types to get the object types and collections. -/// If any error occurs during the retrieval of object types and collections, the function returns the error immediately. /// /// The `procedures` and `functions` are empty vectors. /// /// Finally, the schema information is populated into a `SchemaResponse` struct and returned. /// -/// # Arguments -/// -/// * `calcite_ref` - A `GlobalRef` representing the Calcite reference. -/// /// # Returns /// /// Returns a `Result` containing the `SchemaResponse` on success, or a boxed `dyn Error` on failure. @@ -48,10 +32,8 @@ use crate::version5::ParsedConfiguration; /// /// use ndc_calcite_schema::schema::get_schema; /// -/// let calcite_ref = GlobalRef::new(); -/// /// // Get the schema -/// let schema = get_schema(configuration, calcite_ref)?; +/// let schema = get_schema(configuration)?; /// /// // Print the schema /// println!("Schema: {:?}", schema); @@ -60,9 +42,10 @@ use crate::version5::ParsedConfiguration; /// } /// ``` // ANCHOR: get_schema -#[tracing::instrument(skip(configuration, calcite_ref), level=Level::INFO)] -pub fn get_schema(configuration: &ParsedConfiguration, calcite_ref: GlobalRef) -> Result { - let data_models = get_models(&calcite_ref); +#[tracing::instrument(skip(configuration), level=Level::INFO)] +pub fn get_schema(configuration: &ParsedConfiguration) -> Result { + let data_models = configuration.metadata.clone().unwrap_or_default(); + let scalar_types = scalars::scalars(); let (object_types, collections) = match collections::collections(&data_models, &scalar_types) { Ok(value) => value, @@ -77,28 +60,7 @@ pub fn get_schema(configuration: &ParsedConfiguration, calcite_ref: GlobalRef) - functions, procedures, }; - let file_path = if is_running_in_container() { - Path::new(DOCKER_CONNECTOR_RW).join(CONFIGURATION_FILENAME) - } else { - Path::new(".").join(DEV_CONFIG_FILE_NAME) - }; - event!(Level::INFO, config_path = format!("Configuration file path: {}", file_path.display())); - let mut new_configuration = configuration.clone(); - new_configuration.metadata = Some(data_models.clone()); - let file_path_clone = file_path.clone(); - let file = File::create(file_path); - match file { - Ok(mut file) => { - let serialized_json = serde_json::to_string_pretty(&new_configuration).map_err(ErrorResponse::from_error)?; - file.write_all(serialized_json.as_bytes()).map_err(ErrorResponse::from_error)?; - event!(Level::INFO, "Wrote metadata to config: {}", serde_json::to_string(&schema).unwrap()); - } - Err(_) => { - debug!("Unable to create config file: {:?}", file_path_clone); - event!(Level::DEBUG, "Unable to create config file {:?}, schema: {:?}", file_path_clone, serde_json::to_string(&schema).unwrap()); - // Not updating the config file is not fatal - } - } Ok(schema) + } // ANCHOR_END: get_schema diff --git a/crates/calcite-schema/src/test.rs b/crates/calcite-schema/src/test.rs index 6d05cbb..80ac351 100644 --- a/crates/calcite-schema/src/test.rs +++ b/crates/calcite-schema/src/test.rs @@ -1,48 +1,52 @@ #[cfg(test)] +// TODO: To re-enable this test, we need to generate configuration +// values on the fly and then validate them against the schema. pub mod common { - use std::fmt::Write; - use std::path::{Path, PathBuf}; + // use std::fmt::Write; + // use std::path::{Path, PathBuf}; - /// Find the project root via the crate root provided by `cargo test`, - /// and get our single static configuration file. - /// This depends on the convention that all our crates live in `/crates/` - /// and will break in the unlikely case that we change this - pub fn get_path_from_project_root(ndc_metadata_path: impl AsRef) -> PathBuf { - let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - d.push("../../"); - d.push(ndc_metadata_path); + // /** + // Find the project root via the crate root provided by `cargo test`, + // and get our single static configuration file. + // This depends on the convention that all our crates live in `/crates/` + // and will break in the unlikely case that we change this + // */ + // fn get_path_from_project_root(ndc_metadata_path: impl AsRef) -> PathBuf { + // let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + // d.push("../../"); + // d.push(ndc_metadata_path); - d - } - /// Checks that a given value conforms to the schema generated by `schemars`. - /// - /// Panics with a human-readable error if the value does not conform, or if the - /// schema could not be compiled. - pub fn check_value_conforms_to_schema(value: &serde_json::Value) { - let schema_json = serde_json::to_value(schemars::schema_for!(T)) - .expect("the schema could not be converted to JSON"); - let schema = jsonschema::JSONSchema::options() - .with_draft(jsonschema::Draft::Draft7) - .compile(&schema_json) - .expect("the schema could not be compiled"); + // d + // } + // /// Checks that a given value conforms to the schema generated by `schemars`. + // /// + // /// Panics with a human-readable error if the value does not conform, or if the + // /// schema could not be compiled. + // fn check_value_conforms_to_schema(value: &serde_json::Value) { + // let schema_json = serde_json::to_value(schemars::schema_for!(T)) + // .expect("the schema could not be converted to JSON"); + // let schema = jsonschema::JSONSchema::options() + // .with_draft(jsonschema::Draft::Draft7) + // .compile(&schema_json) + // .expect("the schema could not be compiled"); - let result = schema.validate(value); + // let result = schema.validate(value); - match result { - Ok(()) => (), - Err(errors) => { - panic!( - "The configuration does not conform to the schema.\n{}", - errors.fold(String::new(), |mut str, error| { - let _ = write!( - str, - "{}\ninstance path: {}\nschema path: {}\n\n", - error, error.instance_path, error.schema_path - ); - str - }) - ) - } - } - } + // match result { + // Ok(()) => (), + // Err(errors) => { + // panic!( + // "The configuration does not conform to the schema.\n{}", + // errors.fold(String::new(), |mut str, error| { + // let _ = write!( + // str, + // "{}\ninstance path: {}\nschema path: {}\n\n", + // error, error.instance_path, error.schema_path + // ); + // str + // }) + // ) + // } + // } + // } } diff --git a/crates/calcite-schema/src/version5.rs b/crates/calcite-schema/src/version5.rs index ef19e45..0900945 100644 --- a/crates/calcite-schema/src/version5.rs +++ b/crates/calcite-schema/src/version5.rs @@ -1,9 +1,8 @@ //! Internal Configuration and state for our connector. -use std::collections::{HashMap}; +use std::collections::HashMap; use std::{error, fmt}; use std::path::Path; -use jni::errors::{Error, JniError}; use jni::JNIEnv; use jni::objects::{GlobalRef, JObject, JValueGen, JValueOwned}; use jni::objects::JValueGen::Object; @@ -21,7 +20,7 @@ use crate::calcite::{Model, TableMetadata}; use crate::configuration::has_configuration; use crate::environment::Environment; use crate::error::{ParseConfigurationError, WriteParsedConfigurationError}; -use crate::jvm::{get_jvm}; +use crate::jvm::{get_jvm, init_jvm}; use crate::models::get_models; #[derive(Debug)] @@ -59,10 +58,17 @@ impl CalciteRefSingleton { pub fn initialize(&self, args: &crate::configuration::ParsedConfiguration) -> Result<(), &'static str> { match args { crate::configuration::ParsedConfiguration::Version5(config) => { - let java_vm = get_jvm().lock().unwrap(); - let env = java_vm.attach_current_thread_as_daemon().map_err(| _ | "Could not attach thread to JVM") ?; - let calcite_ref = create_query_engine(&config, env).map_err(|_ | "Could not create Calcite Query Engine") ?; - self.calcite_ref.set(calcite_ref).map_err(| _ | "Calcite Query Engine already initialized") + dotenv::dotenv().ok(); + let calcite; + let calcite_ref; + init_jvm(args, false); + let java_vm = get_jvm(false).lock().unwrap(); + let mut env = java_vm.attach_current_thread_as_daemon().unwrap(); + calcite = create_query_engine(&config, &mut env); + let new_env = java_vm.attach_current_thread_as_daemon().unwrap(); + calcite_ref = new_env.new_global_ref(calcite).unwrap(); + self.calcite_ref.set(calcite_ref).map_err(| e | format!("Calcite Query Engine already initialized - {e:#?}")).unwrap(); + Ok(()) } } } @@ -110,6 +116,17 @@ pub enum Version { This, } +#[tracing::instrument(skip(configuration, env), level = Level::INFO)] +pub fn create_query_engine<'a>(configuration: &'a ParsedConfiguration, env: &'a mut JNIEnv<'a>) -> JObject<'a> { + + let class = env.find_class("com/hasura/CalciteQuery").unwrap(); + let instance = env.new_object(class, "()V", &[]).unwrap(); + let _ = create_jvm_connection(configuration, &instance, env).expect("Failed to create JVM connection"); + event!(Level::INFO, "Instantiated Calcite Query Engine"); + instance +} + + impl ParsedConfiguration { pub fn empty() -> Self { debug!("Configuration is empty."); @@ -155,39 +172,13 @@ pub fn create_jvm_connection<'a, 'b>( } } -#[tracing::instrument(skip(configuration, env), level=Level::INFO)] -pub fn create_query_engine<'a>( - configuration: &'a ParsedConfiguration, - mut env: JNIEnv<'a> -) -> Result { - let class_result = env.find_class("com/hasura/CalciteQuery"); - let class = match class_result { - Ok(class) => class, - Err(e) => return Err(e), - }; - let instance_result = env.new_object(class, "()V", &[]); - let instance = match instance_result { - Ok(instance) => instance, - Err(e) => return Err(e), - }; - - match create_jvm_connection(configuration, &instance, &mut env) { - Ok(_) => { - event!(Level::INFO, "Instantiated Calcite Query Engine"); - Ok(env.new_global_ref(instance)?) - }, - Err(_) => { - Err(Error::JniCall(JniError::Unknown)) - } - } -} - #[tracing::instrument(skip(_environment,calcite_ref_singleton))] pub async fn introspect( args: &ParsedConfiguration, _environment: impl Environment, calcite_ref_singleton: &CalciteRefSingleton ) -> anyhow::Result { + if let Err(e) = calcite_ref_singleton.initialize(&crate::configuration::ParsedConfiguration::Version5(args.clone())) { println!("Error initializing CalciteRef: {}", e); } @@ -235,7 +226,6 @@ pub async fn parse_configuration( Ok(parsed_config) } - /// Write the parsed configuration into a directory on disk. #[tracing::instrument(skip(out_dir))] pub async fn write_parsed_configuration( diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 6ebb52c..9c295bf 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -24,6 +24,7 @@ tracing = "0.1.40" log = "0.4.22" env_logger = "0.11.5" regex = { workspace = true } +dotenv = { workspace = true } [build-dependencies] build-data = { workspace = true } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index 94b1a74..a47ea3f 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -302,23 +302,17 @@ async fn update( ) .await?; - // Introspect the database - let docker_config_path = &PathBuf::from(DOCKER_CONNECTOR_RW); - let config_path = if is_running_in_container() { - docker_config_path - } else { - &context.context_path - }; - if !has_configuration(config_path) { - initialize(true, &context).await? - } + // if !has_configuration(config_path) { + // initialize(true, &context).await? + // } // It is possible to change the file in the middle of introspection. // We want to detect this scenario and retry, or fail if we are unable to. // We do that with a few attempts. for _attempt in 1..=UPDATE_ATTEMPTS { let existing_configuration = parse_configuration(config_path).await?; - init_jvm(&existing_configuration); + dotenv::dotenv().ok(); + init_jvm(&existing_configuration, false); let output = introspect( existing_configuration.clone(), diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index a2062d5..0cbe351 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -3,7 +3,7 @@ //! This is intended to be automatically downloaded and invoked via the Hasura CLI, as a plugin. //! It is unlikely that end-users will use it directly. -use std::{env}; +use std::env; use std::path::PathBuf; use std::process::ExitCode; use clap::Parser; diff --git a/crates/cli/tests/initialize_tests.rs b/crates/cli/tests/initialize_tests.rs index 045d50b..ff56668 100644 --- a/crates/cli/tests/initialize_tests.rs +++ b/crates/cli/tests/initialize_tests.rs @@ -1,10 +1,11 @@ mod common; +use configuration::version5::{parse_configuration, CalciteRefSingleton}; use tokio::fs; use ndc_calcite_cli::*; -use ndc_calcite_configuration as configuration; -use ndc_calcite_configuration::ParsedConfiguration; +use ndc_calcite_schema as configuration; +use ndc_calcite_schema::version5::ParsedConfiguration; #[tokio::test] async fn test_initialize_directory() -> anyhow::Result<()> { @@ -15,11 +16,15 @@ async fn test_initialize_directory() -> anyhow::Result<()> { environment: configuration::environment::EmptyEnvironment, release_version: None, }; + + let calcite_singleton = CalciteRefSingleton::new(); + run( Command::Initialize { with_metadata: false, }, context, + calcite_singleton, ) .await?; @@ -31,7 +36,7 @@ async fn test_initialize_directory() -> anyhow::Result<()> { assert!(configuration_file_path.exists()); let contents = fs::read_to_string(configuration_file_path).await?; common::assert_ends_with_newline(&contents); - let _: ParsedConfiguration = configuration::parse_configuration(&dir).await?; + let _: ParsedConfiguration = parse_configuration(&dir).await?; let metadata_file_path = dir .path() @@ -51,11 +56,13 @@ async fn test_initialize_version_is_unchanged() -> anyhow::Result<()> { environment: configuration::environment::EmptyEnvironment, release_version: None, }; + let calcite_singleton = CalciteRefSingleton::new(); run( Command::Initialize { with_metadata: false, }, context, + calcite_singleton, ) .await?; @@ -88,11 +95,13 @@ async fn test_do_not_initialize_when_files_already_exist() -> anyhow::Result<()> environment: configuration::environment::EmptyEnvironment, release_version: None, }; + let calcite_singleton = CalciteRefSingleton::new(); match run( Command::Initialize { with_metadata: false, }, context, + calcite_singleton ) .await { @@ -115,11 +124,13 @@ async fn test_initialize_directory_with_metadata() -> anyhow::Result<()> { environment: configuration::environment::EmptyEnvironment, release_version: None, }; + let calcite_singleton = CalciteRefSingleton::new(); run( Command::Initialize { with_metadata: true, }, context, + calcite_singleton, ) .await?; @@ -152,11 +163,14 @@ async fn test_initialize_directory_with_metadata_and_release_version() -> anyhow environment: configuration::environment::EmptyEnvironment, release_version: Some("v1.2.3"), }; + let calcite_singleton = CalciteRefSingleton::new(); + run( Command::Initialize { with_metadata: true, }, context, + calcite_singleton, ) .await?; diff --git a/crates/connectors/ndc-calcite/src/calcite.rs b/crates/connectors/ndc-calcite/src/calcite.rs index 66fed08..508b051 100644 --- a/crates/connectors/ndc-calcite/src/calcite.rs +++ b/crates/connectors/ndc-calcite/src/calcite.rs @@ -11,8 +11,8 @@ use jni::objects::{GlobalRef, JObject, JString, JValueGen}; use jni::objects::JValueGen::Object; use ndc_models as models; use ndc_models::{FieldName, RowFieldValue}; -use ndc_sdk::connector::{ErrorResponse}; -use opentelemetry::trace::{TraceContextExt}; +use ndc_sdk::connector::ErrorResponse; +use opentelemetry::trace::TraceContextExt; use serde_json::Value; use tracing::{event, Level}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -60,7 +60,7 @@ pub type Row = IndexMap; pub fn create_query_engine<'a>(configuration: &'a ParsedConfiguration, env: &'a mut JNIEnv<'a>) -> Result> { let class = env.find_class("com/hasura/CalciteQuery").map_err(ErrorResponse::from_error)?; let instance = env.new_object(class, "()V", &[]).map_err(ErrorResponse::from_error)?; - let _ = create_jvm_connection(configuration, &instance, env); + let _ = create_jvm_connection(configuration, &instance, env).expect("Failed to create JVM connection"); event!(Level::INFO, "Instantiated Calcite Query Engine"); Ok(instance) } @@ -136,7 +136,7 @@ pub fn connector_query( let span_id = otel_context.span().span_context().span_id(); let trace_id = otel_context.span().span_context().trace_id(); - let jvm = get_jvm().lock().unwrap(); + let jvm = get_jvm(true).lock().unwrap(); let mut java_env = jvm.attach_current_thread().map_err(ErrorResponse::from_error)?; let calcite_query = java_env.new_local_ref(calcite_reference).map_err(ErrorResponse::from_error)?; @@ -202,11 +202,11 @@ fn fix_rows(rows: Vec, query_metadata: &models::Query) -> Vec { } } for (_key, value) in &mut row { - if let RowFieldValue(val) = value { - if val == "null" { - *value = RowFieldValue(Value::Null); - } + let RowFieldValue(val) = value; + if val == "null" { + *value = RowFieldValue(Value::Null); } + } row.swap_remove("CONSTANT"); row @@ -225,4 +225,4 @@ impl fmt::Display for CalciteError { } } -impl std::error::Error for CalciteError {} \ No newline at end of file +impl std::error::Error for CalciteError {} diff --git a/crates/connectors/ndc-calcite/src/connector/calcite.rs b/crates/connectors/ndc-calcite/src/connector/calcite.rs index 85d6961..5a7e652 100644 --- a/crates/connectors/ndc-calcite/src/connector/calcite.rs +++ b/crates/connectors/ndc-calcite/src/connector/calcite.rs @@ -23,7 +23,6 @@ use tracing::Instrument; use crate::capabilities::calcite_capabilities; use ndc_calcite_schema::jvm::{get_jvm, init_jvm}; use ndc_calcite_schema::calcite::Model; -use ndc_calcite_schema::models::get_models; use ndc_calcite_schema::schema::get_schema as retrieve_schema; use ndc_calcite_schema::version5::ParsedConfiguration; use ndc_calcite_values::is_running_in_container::is_running_in_container; @@ -98,7 +97,6 @@ impl ConnectorSetup for Calcite { .map_err(|err| ErrorResponse::from_error(err))?; update_model(&mut json_object)?; - update_metadata(&mut json_object); Ok(json_object) } @@ -108,31 +106,23 @@ impl ConnectorSetup for Calcite { .model_file_path .clone() .or_else(|| env::var("MODEL_FILE").ok()) - .ok_or(ErrorResponse::new(StatusCode::from_u16(500).unwrap(), CONFIG_ERROR_MSG.to_string(), serde_json::Value::String("".to_string())))?; + .ok_or(ErrorResponse::new(StatusCode::from_u16(500).unwrap(), CONFIG_ERROR_MSG.to_string(), serde_json::Value::String(String::new())))?; let models = fs::read_to_string(model_file_path.clone()).unwrap(); if has_yaml_extension(&model_file_path.clone()) { let model_object: Model = serde_yaml::from_str(&models) - .map_err(|err| ErrorResponse::from_error(err))?; + .map_err(ErrorResponse::from_error)?; json_object.model = Some(model_object); } else { let model_object: Model = serde_json::from_str(&models) - .map_err(|err| ErrorResponse::from_error(err))?; + .map_err(ErrorResponse::from_error)?; json_object.model = Some(model_object); } Ok(()) } - fn update_metadata(json_object: &mut ParsedConfiguration) { - if json_object.metadata.is_none() { - let state = init_state(&json_object).expect("TODO: panic message"); - json_object.metadata = Some(get_models(&state.calcite_ref)); - println!("metadata: {:?}", serde_json::to_string_pretty(&json_object.metadata)); - } - } - configure_path(span, &configuration_dir); match fs::read_to_string(get_config_file_path(configuration_dir)) { Ok(file_content) => parse_json::(file_content), @@ -181,18 +171,8 @@ impl Connector for Calcite { } .instrument(info_span!("tracing Calcite")) .await; - dotenv::dotenv().ok(); - let calcite; - let calcite_ref; - { - let java_vm = get_jvm().lock().unwrap(); - let mut env = java_vm.attach_current_thread_as_daemon().unwrap(); - calcite = calcite::create_query_engine(configuration, &mut env).or(Err(ErrorResponse::from_error(crate::calcite::CalciteError { message: String::from("Failed to lock JVM") })))?; - let env = java_vm.attach_current_thread_as_daemon().unwrap(); - calcite_ref = env.new_global_ref(calcite).unwrap(); - } - let schema = retrieve_schema(configuration, calcite_ref.clone()); + let schema = retrieve_schema(configuration); match schema { Ok(schema) => Ok(JsonResponse::from(schema)), Err(_) => Err(ErrorResponse::new(StatusCode::from_u16(500).unwrap(),"Problem getting schema.".to_string(),Value::String("".to_string()))), @@ -323,9 +303,11 @@ fn convert_to_relationship_argument(p0: &models::Argument) -> models::Relationsh fn init_state( configuration: &ParsedConfiguration, ) -> Result { + dotenv::dotenv().ok(); - init_jvm(&ndc_calcite_schema::configuration::ParsedConfiguration::Version5(configuration.clone())); - match get_jvm().lock() { + init_jvm(&ndc_calcite_schema::configuration::ParsedConfiguration::Version5(configuration.clone()), true); + let jvm = get_jvm(true); + match jvm.lock() { Ok(java_vm) => { let calcite; let calcite_ref; @@ -344,7 +326,6 @@ fn init_state( } #[cfg(test)] mod tests { - use std::error::Error; use std::path::PathBuf; use axum_test_helper::TestClient; @@ -354,14 +335,18 @@ mod tests { #[tokio::test] async fn capabilities_match_ndc_spec_version() -> Result<()> { + + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let config_dir = PathBuf::from(manifest_dir).join("test_configuration"); + let state = - ndc_sdk::default_main::init_server_state(Calcite::default(), PathBuf::new()).await?; + ndc_sdk::default_main::init_server_state(Calcite::default(), config_dir).await?; let app = ndc_sdk::default_main::create_router::(state, None); let client = TestClient::new(app); let response = client.get("/capabilities").send().await; - assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16()); let body: ndc_models::CapabilitiesResponse = response.json().await; // ideally we would get this version from `ndc_models::VERSION` diff --git a/crates/connectors/ndc-calcite/src/query.rs b/crates/connectors/ndc-calcite/src/query.rs index 6949f2d..5eed359 100644 --- a/crates/connectors/ndc-calcite/src/query.rs +++ b/crates/connectors/ndc-calcite/src/query.rs @@ -9,9 +9,9 @@ use std::collections::BTreeMap; use http::StatusCode; use indexmap::IndexMap; use ndc_models::{ArgumentName, CollectionName, ComparisonOperatorName, ComparisonTarget, ComparisonValue, Expression, Field, FieldName, Query, Relationship, RelationshipArgument, RelationshipName, RelationshipType, RowFieldValue, VariableName}; -use ndc_sdk::connector::{ErrorResponse}; use ndc_sdk::connector::error::Result; use ndc_models as models; +use ndc_sdk::connector::ErrorResponse; use serde_json::{Number, Value}; use tracing::{event, Level, span}; @@ -265,25 +265,24 @@ fn process_object_relationship(rows: Vec, field_name: &FieldName, fk_rows: let rowset = serde_json::map::Map::new(); if let Some(value) = row.get_mut(field_name) { event!(Level::DEBUG, "value: {:?}", value); - if let RowFieldValue(_) = *value { - let (key, name) = pks[0].clone(); - event!(Level::DEBUG, "key: {:?}, name: {:?}", key, name); - let mut child_rows = Vec::new(); - for x in fk_rows { - if let Some(value) = x.get(&key) { - event!(Level::DEBUG, "value: {:?}", value); - if value.0 == pk_value { - child_rows.push(x); - } - } else { - event!(Level::DEBUG, "value: {:?}", value); + let (key, name) = pks[0].clone(); + event!(Level::DEBUG, "key: {:?}, name: {:?}", key, name); + let mut child_rows = Vec::new(); + for x in fk_rows { + if let Some(value) = x.get(&key) { + event!(Level::DEBUG, "value: {:?}", value); + if value.0 == pk_value { + child_rows.push(x); } + } else { + event!(Level::DEBUG, "value: {:?}", value); } - if child_rows.len() > 1 { - child_rows = vec![child_rows[0]]; - } - process_child_rows(&child_rows, rowset, value).expect("TODO: panic message"); } + if child_rows.len() > 1 { + child_rows = vec![child_rows[0]]; + } + process_child_rows(&child_rows, rowset, value).expect("TODO: panic message"); + } row }).collect(); @@ -300,7 +299,7 @@ fn process_array_relationship(rows: Option>, field_name: &FieldName, fk let pk_value = row.get(fks[0]).unwrap().0.clone(); if let Some(value) = row.get_mut(field_name) { event!(Level::DEBUG, "value: {:?}", value); - if let RowFieldValue(_) = *value { + let (key, name) = pks[0].clone(); event!(Level::DEBUG, "key: {:?}, name: {:?}", key, name); let mut child_rows = Vec::new(); @@ -317,7 +316,7 @@ fn process_array_relationship(rows: Option>, field_name: &FieldName, fk } event!(Level::DEBUG, "Key: {:?}, Name: {:?}, Child Rows: {:?}", key, name, child_rows); process_child_rows(&child_rows, rowset, value).expect("TODO: panic message"); - } + } row }).collect(); @@ -394,4 +393,4 @@ fn execute_query_collection( Ok(v) => Ok(Some(v)), Err(e) => Err(e) } -} \ No newline at end of file +} diff --git a/crates/connectors/ndc-calcite/src/sql.rs b/crates/connectors/ndc-calcite/src/sql.rs index fa922d6..0af1cc2 100644 --- a/crates/connectors/ndc-calcite/src/sql.rs +++ b/crates/connectors/ndc-calcite/src/sql.rs @@ -7,8 +7,8 @@ use std::error::Error; use std::fmt; use std::fmt::{Display, Formatter}; use ndc_models::{OrderByTarget, Aggregate, ArgumentName, CollectionName, ComparisonOperatorName, ComparisonTarget, ComparisonValue, ExistsInCollection, Expression, Field, FieldName, Query, Relationship, RelationshipArgument, RelationshipName, UnaryComparisonOperator, VariableName}; -use ndc_sdk::connector::{ErrorResponse}; -use serde_json::{Value}; +use ndc_sdk::connector::ErrorResponse; +use serde_json::Value; use tracing::{event, Level}; use ndc_sdk::connector::error::Result; use ndc_calcite_schema::version5::ParsedConfiguration; diff --git a/ndc-calcite.md b/ndc-calcite.md index 0c9686c..256f699 100644 --- a/ndc-calcite.md +++ b/ndc-calcite.md @@ -57,7 +57,7 @@ cargo build --bin ndc-calcite --bin ndc-calcite-cli ### Build the docker image ```shell -./build-local.sh +./build-local.sh v0.1.0 ``` ### Create a Supergraph @@ -272,8 +272,8 @@ ddn run docker-start ## Additional Notes -All projection, filtering and sorting are handled in memory. -This means that the entire file is read into memory, and then operated on as a table scan. +All projection, filtering and sorting are handled in memory. +This means that the entire file is read into memory, and then operated on as a table scan. Wide tables - with narrow projections may not perform as well as expected. Large tables may not perform well. diff --git a/run-connector-local.sh b/run-connector-local.sh index 57a08fb..fcdca12 100755 --- a/run-connector-local.sh +++ b/run-connector-local.sh @@ -1,2 +1,2 @@ cd adapters/$1 -OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 OTEL_SERVICE_NAME=app_calcite LOG_LEVEL=debug OTEL_METRICS_EXPORTER=console OTEL_TRACES_EXPORTER=console OTEL_LOG_EXPORTER=console RUST_LOG=info cargo run --package ndc-calcite --bin ndc-calcite -- serve --configuration=. \ No newline at end of file +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 OTEL_SERVICE_NAME=app_calcite LOG_LEVEL=debug OTEL_METRICS_EXPORTER=console OTEL_TRACES_EXPORTER=console OTEL_LOG_EXPORTER=console RUST_LOG=info cargo run --package ndc-calcite --bin ndc-calcite -- serve --configuration=connector-context diff --git a/run-connector-update.sh b/run-connector-update.sh new file mode 100755 index 0000000..34fbcbd --- /dev/null +++ b/run-connector-update.sh @@ -0,0 +1,5 @@ +cd adapters/$1 +set -a; source .env; set +a +if cargo run --bin ndc-calcite-cli -- --context connector-context update; then + echo "Configuration generated successfully." +fi diff --git a/rust-analyzer.toml b/rust-analyzer.toml new file mode 100644 index 0000000..76939ac --- /dev/null +++ b/rust-analyzer.toml @@ -0,0 +1,16 @@ +[rust-analyzer] +exclude = [ + "target/**", + "calcite-rs-jni/**", # This has 10,207 files - exclude if not actively working on it + "**/target/**", # Catch all target directories in any subdirectory + "grafana/**", + "scripts/**", + "shared/**", + # Add any other directories you don't need LSP support for +] + +# Optionally, you can explicitly specify which directories to watch +similar_to = [ + "crates/**", # Only watch the crates directory + "adapters/**" # And adapters directory +]