diff --git a/.github/workflows/build-publish-rh-image.yml b/.github/workflows/build-publish-rh-image.yml index 8fd6c9e0b9d98..f9c6fd139cdb8 100644 --- a/.github/workflows/build-publish-rh-image.yml +++ b/.github/workflows/build-publish-rh-image.yml @@ -64,7 +64,7 @@ jobs: platforms: linux/amd64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,deno_core,license,http_trigger,zip,oauth2,kafka,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,deno_core,license,http_trigger,zip,oauth2,kafka,nats,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust secrets: | rh_username=${{ secrets.RH_USERNAME }} rh_password=${{ secrets.RH_PASSWORD }} @@ -81,7 +81,7 @@ jobs: platforms: linux/arm64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,deno_core,license,http_trigger,zip,oauth2,kafka,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,deno_core,license,http_trigger,zip,oauth2,kafka,nats,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust secrets: | rh_username=${{ secrets.RH_USERNAME }} rh_password=${{ secrets.RH_PASSWORD }} diff --git a/.github/workflows/build_windows_worker_.yml b/.github/workflows/build_windows_worker_.yml index a0a9cb0f120c7..1761646d8f540 100644 --- a/.github/workflows/build_windows_worker_.yml +++ b/.github/workflows/build_windows_worker_.yml @@ -45,7 +45,7 @@ jobs: $env:OPENSSL_DIR="${Env:VCPKG_INSTALLATION_ROOT}\installed\x64-windows-static" mkdir frontend/build && cd backend New-Item -Path . -Name "windmill-api/openapi-deref.yaml" -ItemType "File" -Force - cargo build --release --features=enterprise,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + cargo build --release --features=enterprise,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,nats,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust - name: Rename binary with corresponding architecture run: | diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 00f8d5cd76acf..8361c07a59bc0 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -149,7 +149,7 @@ jobs: platforms: linux/amd64,linux/arm64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,otel,dind,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,nats,otel,dind,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust tags: | ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ee:${{ env.DEV_SHA }} ${{ steps.meta-ee-public.outputs.tags }} @@ -211,7 +211,7 @@ jobs: platforms: linux/amd64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,otel,dind,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,nats,otel,dind,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust PYTHON_IMAGE=python:3.12.2-slim-bookworm tags: | ${{ steps.meta-ee-public-py312.outputs.tags }} diff --git a/.github/workflows/publish_windows_worker.yml b/.github/workflows/publish_windows_worker.yml index 9c8d8956e78c6..324c762b19854 100644 --- a/.github/workflows/publish_windows_worker.yml +++ b/.github/workflows/publish_windows_worker.yml @@ -47,7 +47,7 @@ jobs: $env:OPENSSL_DIR="${Env:VCPKG_INSTALLATION_ROOT}\installed\x64-windows-static" mkdir frontend/build && cd backend New-Item -Path . -Name "windmill-api/openapi-deref.yaml" -ItemType "File" -Force - cargo build --release --features=enterprise,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust + cargo build --release --features=enterprise,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,license,http_trigger,zip,oauth2,kafka,nats,php,mysql,mssql,bigquery,websocket,python,smtp,csharp,static_frontend,rust - name: Rename binary with corresponding architecture run: | diff --git a/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json b/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json index a86748af87145..a3bc7c1ef6d63 100644 --- a/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json +++ b/backend/.sqlx/query-07da723ce5c9ee2d7c236e8eabe254c783fc34b617c8a9a95a0eb0cda535dab5.json @@ -17,7 +17,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-5962733746c81480abe9ab3a6ccf5664130ae7500279055c08db42d67b2822f6.json b/backend/.sqlx/query-1c0f95a069891f7214505aaa9428e11e25c1d3a30294cbe78f01ed75edf68da4.json similarity index 74% rename from backend/.sqlx/query-5962733746c81480abe9ab3a6ccf5664130ae7500279055c08db42d67b2822f6.json rename to backend/.sqlx/query-1c0f95a069891f7214505aaa9428e11e25c1d3a30294cbe78f01ed75edf68da4.json index 0d64b110f8449..f00f41d16ba62 100644 --- a/backend/.sqlx/query-5962733746c81480abe9ab3a6ccf5664130ae7500279055c08db42d67b2822f6.json +++ b/backend/.sqlx/query-1c0f95a069891f7214505aaa9428e11e25c1d3a30294cbe78f01ed75edf68da4.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE kafka_trigger SET kafka_resource_path = $1, group_id = $2, topics = $3, script_path = $4, path = $5, is_flow = $6, edited_by = $7, email = $8, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL\n WHERE workspace_id = $9 AND path = $10", + "query": "UPDATE kafka_trigger SET kafka_resource_path = $1, group_id = $2, topics = $3, script_path = $4, path = $5, is_flow = $6, edited_by = $7, email = $8, edited_at = now(), server_id = NULL, error = NULL\n WHERE workspace_id = $9 AND path = $10", "describe": { "columns": [], "parameters": { @@ -19,5 +19,5 @@ }, "nullable": [] }, - "hash": "5962733746c81480abe9ab3a6ccf5664130ae7500279055c08db42d67b2822f6" + "hash": "1c0f95a069891f7214505aaa9428e11e25c1d3a30294cbe78f01ed75edf68da4" } diff --git a/backend/.sqlx/query-cc8a71abdeccf0787695af32aa21fe73aba65cfdc8329c02b4d257de4d2d168a.json b/backend/.sqlx/query-1ef48cc430870ab6c062b046bdbf0a2db057141619e9818ddb904d7c242efec3.json similarity index 69% rename from backend/.sqlx/query-cc8a71abdeccf0787695af32aa21fe73aba65cfdc8329c02b4d257de4d2d168a.json rename to backend/.sqlx/query-1ef48cc430870ab6c062b046bdbf0a2db057141619e9818ddb904d7c242efec3.json index c8f2b72b831eb..7fc5a4c430f6e 100644 --- a/backend/.sqlx/query-cc8a71abdeccf0787695af32aa21fe73aba65cfdc8329c02b4d257de4d2d168a.json +++ b/backend/.sqlx/query-1ef48cc430870ab6c062b046bdbf0a2db057141619e9818ddb904d7c242efec3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT \n EXISTS(SELECT 1 FROM websocket_trigger WHERE workspace_id = $1) as \"websocket_used!\", \n EXISTS(SELECT 1 FROM http_trigger WHERE workspace_id = $1) as \"http_routes_used!\",\n EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as \"kafka_used!\"", + "query": "SELECT \n EXISTS(SELECT 1 FROM websocket_trigger WHERE workspace_id = $1) as \"websocket_used!\", \n EXISTS(SELECT 1 FROM http_trigger WHERE workspace_id = $1) as \"http_routes_used!\",\n EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as \"kafka_used!\",\n EXISTS(SELECT 1 FROM nats_trigger WHERE workspace_id = $1) as \"nats_used!\"", "describe": { "columns": [ { @@ -17,6 +17,11 @@ "ordinal": 2, "name": "kafka_used!", "type_info": "Bool" + }, + { + "ordinal": 3, + "name": "nats_used!", + "type_info": "Bool" } ], "parameters": { @@ -25,10 +30,11 @@ ] }, "nullable": [ + null, null, null, null ] }, - "hash": "cc8a71abdeccf0787695af32aa21fe73aba65cfdc8329c02b4d257de4d2d168a" + "hash": "1ef48cc430870ab6c062b046bdbf0a2db057141619e9818ddb904d7c242efec3" } diff --git a/backend/.sqlx/query-12a86755706ce030a0a9142da78d035d0c7d361240b60005cc864e4345eb0bc7.json b/backend/.sqlx/query-25975935d59f88df117bae0fa5016cf95ad56f790f0eddb6e2fbe3d83d3accaa.json similarity index 63% rename from backend/.sqlx/query-12a86755706ce030a0a9142da78d035d0c7d361240b60005cc864e4345eb0bc7.json rename to backend/.sqlx/query-25975935d59f88df117bae0fa5016cf95ad56f790f0eddb6e2fbe3d83d3accaa.json index dc357c2169b18..4ae1f07aa89c2 100644 --- a/backend/.sqlx/query-12a86755706ce030a0a9142da78d035d0c7d361240b60005cc864e4345eb0bc7.json +++ b/backend/.sqlx/query-25975935d59f88df117bae0fa5016cf95ad56f790f0eddb6e2fbe3d83d3accaa.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE websocket_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL\n WHERE path = $4 AND workspace_id = $5 RETURNING 1", + "query": "UPDATE websocket_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, error = NULL\n WHERE path = $4 AND workspace_id = $5 RETURNING 1", "describe": { "columns": [ { @@ -22,5 +22,5 @@ null ] }, - "hash": "12a86755706ce030a0a9142da78d035d0c7d361240b60005cc864e4345eb0bc7" + "hash": "25975935d59f88df117bae0fa5016cf95ad56f790f0eddb6e2fbe3d83d3accaa" } diff --git a/backend/.sqlx/query-2f440ab6083764b49e309c1f8dde0c5508da110f54bc5f465f1892afdf851af0.json b/backend/.sqlx/query-2f440ab6083764b49e309c1f8dde0c5508da110f54bc5f465f1892afdf851af0.json index 7abe77803700e..986ab59f4db00 100644 --- a/backend/.sqlx/query-2f440ab6083764b49e309c1f8dde0c5508da110f54bc5f465f1892afdf851af0.json +++ b/backend/.sqlx/query-2f440ab6083764b49e309c1f8dde0c5508da110f54bc5f465f1892afdf851af0.json @@ -25,7 +25,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } @@ -56,7 +57,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-3d8fdf7adf42b27808a5b223df3a660bfd5a8c0a451a0deac24632f0ccf66165.json b/backend/.sqlx/query-3d8fdf7adf42b27808a5b223df3a660bfd5a8c0a451a0deac24632f0ccf66165.json new file mode 100644 index 0000000000000..c48927fdf24d3 --- /dev/null +++ b/backend/.sqlx/query-3d8fdf7adf42b27808a5b223df3a660bfd5a8c0a451a0deac24632f0ccf66165.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE nats_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, error = NULL\n WHERE path = $4 AND workspace_id = $5 RETURNING 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Bool", + "Varchar", + "Varchar", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "3d8fdf7adf42b27808a5b223df3a660bfd5a8c0a451a0deac24632f0ccf66165" +} diff --git a/backend/.sqlx/query-42030372693d8d6b8d03947bb6702024cfa226735eeeb720bbab46bd0e3cedcc.json b/backend/.sqlx/query-42030372693d8d6b8d03947bb6702024cfa226735eeeb720bbab46bd0e3cedcc.json new file mode 100644 index 0000000000000..cef1da340cdb6 --- /dev/null +++ b/backend/.sqlx/query-42030372693d8d6b8d03947bb6702024cfa226735eeeb720bbab46bd0e3cedcc.json @@ -0,0 +1,50 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT path, is_flow, workspace_id, trigger_config as \"trigger_config!: _\", owner, email FROM capture_config WHERE trigger_kind = 'nats' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "path", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "is_flow", + "type_info": "Bool" + }, + { + "ordinal": 2, + "name": "workspace_id", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "trigger_config!: _", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "owner", + "type_info": "Varchar" + }, + { + "ordinal": 5, + "name": "email", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + true, + false, + false + ] + }, + "hash": "42030372693d8d6b8d03947bb6702024cfa226735eeeb720bbab46bd0e3cedcc" +} diff --git a/backend/.sqlx/query-4f1bb3713bc52fb8cd4088947de1e9905387a20bf4b137adf0a33274f820ddc2.json b/backend/.sqlx/query-47560aa3d3af93167663e763ad873088eb4f57785df3c383bbf6e5ceab268980.json similarity index 81% rename from backend/.sqlx/query-4f1bb3713bc52fb8cd4088947de1e9905387a20bf4b137adf0a33274f820ddc2.json rename to backend/.sqlx/query-47560aa3d3af93167663e763ad873088eb4f57785df3c383bbf6e5ceab268980.json index 7ab68b14b6cc1..541634afdf1d2 100644 --- a/backend/.sqlx/query-4f1bb3713bc52fb8cd4088947de1e9905387a20bf4b137adf0a33274f820ddc2.json +++ b/backend/.sqlx/query-47560aa3d3af93167663e763ad873088eb4f57785df3c383bbf6e5ceab268980.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT path, is_flow, workspace_id, trigger_config as \"trigger_config!: _\", owner, email FROM capture_config WHERE trigger_kind = 'websocket' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')", + "query": "SELECT path, is_flow, workspace_id, trigger_config as \"trigger_config!: _\", owner, email FROM capture_config WHERE trigger_kind = 'websocket' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "4f1bb3713bc52fb8cd4088947de1e9905387a20bf4b137adf0a33274f820ddc2" + "hash": "47560aa3d3af93167663e763ad873088eb4f57785df3c383bbf6e5ceab268980" } diff --git a/backend/.sqlx/query-0b94bd4c98a11ca1b7e5e34dd1ee6fcb0b7a54ed4218fa3cf23cc929d009d50f.json b/backend/.sqlx/query-561b7935d687f6b9f3d6488f8489f55e6737ffa2d3716f803d32f8b68cc1e915.json similarity index 73% rename from backend/.sqlx/query-0b94bd4c98a11ca1b7e5e34dd1ee6fcb0b7a54ed4218fa3cf23cc929d009d50f.json rename to backend/.sqlx/query-561b7935d687f6b9f3d6488f8489f55e6737ffa2d3716f803d32f8b68cc1e915.json index 75288e80ae7d1..a3e2f3d338525 100644 --- a/backend/.sqlx/query-0b94bd4c98a11ca1b7e5e34dd1ee6fcb0b7a54ed4218fa3cf23cc929d009d50f.json +++ b/backend/.sqlx/query-561b7935d687f6b9f3d6488f8489f55e6737ffa2d3716f803d32f8b68cc1e915.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE websocket_trigger SET url = $1, script_path = $2, path = $3, is_flow = $4, filters = $5, initial_messages = $6, url_runnable_args = $7, edited_by = $8, email = $9, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL\n WHERE workspace_id = $10 AND path = $11", + "query": "UPDATE websocket_trigger SET url = $1, script_path = $2, path = $3, is_flow = $4, filters = $5, initial_messages = $6, url_runnable_args = $7, edited_by = $8, email = $9, edited_at = now(), server_id = NULL, error = NULL\n WHERE workspace_id = $10 AND path = $11", "describe": { "columns": [], "parameters": { @@ -20,5 +20,5 @@ }, "nullable": [] }, - "hash": "0b94bd4c98a11ca1b7e5e34dd1ee6fcb0b7a54ed4218fa3cf23cc929d009d50f" + "hash": "561b7935d687f6b9f3d6488f8489f55e6737ffa2d3716f803d32f8b68cc1e915" } diff --git a/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json b/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json index 1f23430419f27..a8dcf9e0118d5 100644 --- a/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json +++ b/backend/.sqlx/query-71d51bbc35da7b9930e3ea3a634451217ccb9f1bc35b1ad6e10d16bc19c41447.json @@ -28,7 +28,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-746ee16a04267cd251d4cefbc44a37fcc985bf61978cc07167ed35d7dbf92d11.json b/backend/.sqlx/query-746ee16a04267cd251d4cefbc44a37fcc985bf61978cc07167ed35d7dbf92d11.json new file mode 100644 index 0000000000000..4a7e81455ea4d --- /dev/null +++ b/backend/.sqlx/query-746ee16a04267cd251d4cefbc44a37fcc985bf61978cc07167ed35d7dbf92d11.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS(SELECT 1 FROM nats_trigger WHERE path = $1 AND workspace_id = $2)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "746ee16a04267cd251d4cefbc44a37fcc985bf61978cc07167ed35d7dbf92d11" +} diff --git a/backend/.sqlx/query-7b36b58761cd459b808cc4f463dfc93f01fdbc19c66d7404f95e1f8444235a8e.json b/backend/.sqlx/query-7b36b58761cd459b808cc4f463dfc93f01fdbc19c66d7404f95e1f8444235a8e.json new file mode 100644 index 0000000000000..0fe177b05db5b --- /dev/null +++ b/backend/.sqlx/query-7b36b58761cd459b808cc4f463dfc93f01fdbc19c66d7404f95e1f8444235a8e.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE nats_trigger SET nats_resource_path = $1, subjects = $2, stream_name = $3, consumer_name = $4, use_jetstream = $5, script_path = $6, path = $7, is_flow = $8, edited_by = $9, email = $10, edited_at = now(), server_id = NULL, error = NULL\n WHERE workspace_id = $11 AND path = $12", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar", + "VarcharArray", + "Varchar", + "Varchar", + "Bool", + "Varchar", + "Varchar", + "Bool", + "Varchar", + "Varchar", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "7b36b58761cd459b808cc4f463dfc93f01fdbc19c66d7404f95e1f8444235a8e" +} diff --git a/backend/.sqlx/query-870b12a46f26c9e29889dd28a5c7832c5a674a5553674fd9762c981e0d03bb57.json b/backend/.sqlx/query-870b12a46f26c9e29889dd28a5c7832c5a674a5553674fd9762c981e0d03bb57.json new file mode 100644 index 0000000000000..f5d544c5aa24c --- /dev/null +++ b/backend/.sqlx/query-870b12a46f26c9e29889dd28a5c7832c5a674a5553674fd9762c981e0d03bb57.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM nats_trigger WHERE workspace_id = $1 AND path = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "870b12a46f26c9e29889dd28a5c7832c5a674a5553674fd9762c981e0d03bb57" +} diff --git a/backend/.sqlx/query-92e60af0d3ae8c73d74ae68d70e20ae18f79cc84626097b766307cd42722baa3.json b/backend/.sqlx/query-92e60af0d3ae8c73d74ae68d70e20ae18f79cc84626097b766307cd42722baa3.json new file mode 100644 index 0000000000000..acc67696522d1 --- /dev/null +++ b/backend/.sqlx/query-92e60af0d3ae8c73d74ae68d70e20ae18f79cc84626097b766307cd42722baa3.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE websocket_trigger SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "92e60af0d3ae8c73d74ae68d70e20ae18f79cc84626097b766307cd42722baa3" +} diff --git a/backend/.sqlx/query-99a2c935acf5d6bbeb70ea1255679115b6e9042800d40899a36b3867049c5c46.json b/backend/.sqlx/query-99a2c935acf5d6bbeb70ea1255679115b6e9042800d40899a36b3867049c5c46.json new file mode 100644 index 0000000000000..f38b293acbc45 --- /dev/null +++ b/backend/.sqlx/query-99a2c935acf5d6bbeb70ea1255679115b6e9042800d40899a36b3867049c5c46.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE capture_config SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'websocket' AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [ + null + ] + }, + "hash": "99a2c935acf5d6bbeb70ea1255679115b6e9042800d40899a36b3867049c5c46" +} diff --git a/backend/.sqlx/query-ad07b57ad928a4e5260833f4fe2a4bf51211f2f73d78c546c894c8e147677ac7.json b/backend/.sqlx/query-ad07b57ad928a4e5260833f4fe2a4bf51211f2f73d78c546c894c8e147677ac7.json new file mode 100644 index 0000000000000..5fc32269c20ae --- /dev/null +++ b/backend/.sqlx/query-ad07b57ad928a4e5260833f4fe2a4bf51211f2f73d78c546c894c8e147677ac7.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM nats_trigger WHERE script_path = $1 AND is_flow = $2 AND workspace_id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Bool", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "ad07b57ad928a4e5260833f4fe2a4bf51211f2f73d78c546c894c8e147677ac7" +} diff --git a/backend/.sqlx/query-add48c8e7c6fa2c549ad6293cbee22889d35e919d3267c1d2a265d868fa8a7d1.json b/backend/.sqlx/query-add48c8e7c6fa2c549ad6293cbee22889d35e919d3267c1d2a265d868fa8a7d1.json new file mode 100644 index 0000000000000..0468efc99f4d7 --- /dev/null +++ b/backend/.sqlx/query-add48c8e7c6fa2c549ad6293cbee22889d35e919d3267c1d2a265d868fa8a7d1.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE nats_trigger SET enabled = FALSE, error = $1, server_id = NULL, last_server_ping = NULL WHERE workspace_id = $2 AND path = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "add48c8e7c6fa2c549ad6293cbee22889d35e919d3267c1d2a265d868fa8a7d1" +} diff --git a/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json b/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json index 211e8fa8abd89..4d79556b49215 100644 --- a/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json +++ b/backend/.sqlx/query-c223f8b7fa4ef1aa06e1ba2a56d677774aa237508d5610714efd2e9b8b93c7b8.json @@ -20,7 +20,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json b/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json index 52bb869ca651b..b7ac98ff249a1 100644 --- a/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json +++ b/backend/.sqlx/query-c5270ee815689e42b65df507b850da43239c9a5aaea41c9aed7ed33a6219a534.json @@ -17,7 +17,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-cd5d62d456b74237b941bc72ea8de007185263167bf1dfa2e469d319cc4da674.json b/backend/.sqlx/query-cd5d62d456b74237b941bc72ea8de007185263167bf1dfa2e469d319cc4da674.json new file mode 100644 index 0000000000000..912b9a53de7bf --- /dev/null +++ b/backend/.sqlx/query-cd5d62d456b74237b941bc72ea8de007185263167bf1dfa2e469d319cc4da674.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE nats_trigger SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "cd5d62d456b74237b941bc72ea8de007185263167bf1dfa2e469d319cc4da674" +} diff --git a/backend/.sqlx/query-ee9adcbf82d3f62088a38ff65e8c90ac1c18b5df7aab6a143a328a6bddc6ad32.json b/backend/.sqlx/query-d06efdc24706e0d7479bffc0b19a0c5976ee60125289e9f7b0b04090bce4a3a3.json similarity index 52% rename from backend/.sqlx/query-ee9adcbf82d3f62088a38ff65e8c90ac1c18b5df7aab6a143a328a6bddc6ad32.json rename to backend/.sqlx/query-d06efdc24706e0d7479bffc0b19a0c5976ee60125289e9f7b0b04090bce4a3a3.json index 1c1179b188463..79fe0873dd789 100644 --- a/backend/.sqlx/query-ee9adcbf82d3f62088a38ff65e8c90ac1c18b5df7aab6a143a328a6bddc6ad32.json +++ b/backend/.sqlx/query-d06efdc24706e0d7479bffc0b19a0c5976ee60125289e9f7b0b04090bce4a3a3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE capture_config SET server_id = $1, last_server_ping = now() WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'kafka' AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "query": "UPDATE capture_config SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'kafka' AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", "describe": { "columns": [ { @@ -21,5 +21,5 @@ null ] }, - "hash": "ee9adcbf82d3f62088a38ff65e8c90ac1c18b5df7aab6a143a328a6bddc6ad32" + "hash": "d06efdc24706e0d7479bffc0b19a0c5976ee60125289e9f7b0b04090bce4a3a3" } diff --git a/backend/.sqlx/query-d6615719bf8db4b333ed55c9a3c160ad1962668fc3305d8e80ae91ef73614a80.json b/backend/.sqlx/query-d6615719bf8db4b333ed55c9a3c160ad1962668fc3305d8e80ae91ef73614a80.json deleted file mode 100644 index 271c395f9ac09..0000000000000 --- a/backend/.sqlx/query-d6615719bf8db4b333ed55c9a3c160ad1962668fc3305d8e80ae91ef73614a80.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE kafka_trigger SET server_id = $1, last_server_ping = now() WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "?column?", - "type_info": "Bool" - } - ], - "parameters": { - "Left": [ - "Varchar", - "Text", - "Text" - ] - }, - "nullable": [ - null - ] - }, - "hash": "d6615719bf8db4b333ed55c9a3c160ad1962668fc3305d8e80ae91ef73614a80" -} diff --git a/backend/.sqlx/query-02424907504848e983bfa89eec343061932dc5b4b17cf13d5cf8d833aedbe6d5.json b/backend/.sqlx/query-d78ecf85c1e1e95650c380c9488aa90d8d8c5c76f3971484e4c515ff60293d3b.json similarity index 58% rename from backend/.sqlx/query-02424907504848e983bfa89eec343061932dc5b4b17cf13d5cf8d833aedbe6d5.json rename to backend/.sqlx/query-d78ecf85c1e1e95650c380c9488aa90d8d8c5c76f3971484e4c515ff60293d3b.json index c8b5e3086f9f2..cdb8b928b3642 100644 --- a/backend/.sqlx/query-02424907504848e983bfa89eec343061932dc5b4b17cf13d5cf8d833aedbe6d5.json +++ b/backend/.sqlx/query-d78ecf85c1e1e95650c380c9488aa90d8d8c5c76f3971484e4c515ff60293d3b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE websocket_trigger SET server_id = $1, last_server_ping = now() WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "query": "UPDATE kafka_trigger SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", "describe": { "columns": [ { @@ -20,5 +20,5 @@ null ] }, - "hash": "02424907504848e983bfa89eec343061932dc5b4b17cf13d5cf8d833aedbe6d5" + "hash": "d78ecf85c1e1e95650c380c9488aa90d8d8c5c76f3971484e4c515ff60293d3b" } diff --git a/backend/.sqlx/query-438b8bbf49781fc362eecbdc99cb9d2fafd03a59197477aafc915b0e34a00eb2.json b/backend/.sqlx/query-d8ea17fba0e417333e9c6cf82ad36a57830a791c8358e30e9be012b815f5e8e3.json similarity index 81% rename from backend/.sqlx/query-438b8bbf49781fc362eecbdc99cb9d2fafd03a59197477aafc915b0e34a00eb2.json rename to backend/.sqlx/query-d8ea17fba0e417333e9c6cf82ad36a57830a791c8358e30e9be012b815f5e8e3.json index a13521c31cfec..dcaee3cfe830f 100644 --- a/backend/.sqlx/query-438b8bbf49781fc362eecbdc99cb9d2fafd03a59197477aafc915b0e34a00eb2.json +++ b/backend/.sqlx/query-d8ea17fba0e417333e9c6cf82ad36a57830a791c8358e30e9be012b815f5e8e3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT path, is_flow, workspace_id, trigger_config as \"trigger_config!: _\", owner, email FROM capture_config WHERE trigger_kind = 'kafka' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')", + "query": "SELECT path, is_flow, workspace_id, trigger_config as \"trigger_config!: _\", owner, email FROM capture_config WHERE trigger_kind = 'kafka' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "438b8bbf49781fc362eecbdc99cb9d2fafd03a59197477aafc915b0e34a00eb2" + "hash": "d8ea17fba0e417333e9c6cf82ad36a57830a791c8358e30e9be012b815f5e8e3" } diff --git a/backend/.sqlx/query-dcf03ff4b922b93be37d2be5da6884ae3e8c6cf7eaa2d9c62056366eb42f2276.json b/backend/.sqlx/query-dcf03ff4b922b93be37d2be5da6884ae3e8c6cf7eaa2d9c62056366eb42f2276.json new file mode 100644 index 0000000000000..989e4612ec91f --- /dev/null +++ b/backend/.sqlx/query-dcf03ff4b922b93be37d2be5da6884ae3e8c6cf7eaa2d9c62056366eb42f2276.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE nats_trigger SET last_server_ping = now(), error = $1 WHERE workspace_id = $2 AND path = $3 AND server_id = $4 AND enabled IS TRUE RETURNING 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "dcf03ff4b922b93be37d2be5da6884ae3e8c6cf7eaa2d9c62056366eb42f2276" +} diff --git a/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json b/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json index d1c5a4b4b919e..950d7662ad1d4 100644 --- a/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json +++ b/backend/.sqlx/query-e23e110e1f0438d21534fc4323e0e7bc1f0dbeca2e4f44ced05bae0ca5ca1039.json @@ -33,7 +33,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-e37ba13aa3174931f0bfcff26dbc141fe8a346ab6a1a3bc924794bfe3c9af306.json b/backend/.sqlx/query-e37ba13aa3174931f0bfcff26dbc141fe8a346ab6a1a3bc924794bfe3c9af306.json new file mode 100644 index 0000000000000..cec42e51c6e8b --- /dev/null +++ b/backend/.sqlx/query-e37ba13aa3174931f0bfcff26dbc141fe8a346ab6a1a3bc924794bfe3c9af306.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE capture_config SET last_server_ping = now(), error = $1 WHERE workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'nats' AND server_id = $5 AND last_client_ping > NOW() - INTERVAL '10 seconds' RETURNING 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Bool", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "e37ba13aa3174931f0bfcff26dbc141fe8a346ab6a1a3bc924794bfe3c9af306" +} diff --git a/backend/.sqlx/query-6eb076afcf11dba0dfe35ee108c64814f95dbe5376f0539195d24a8ca96ca3e2.json b/backend/.sqlx/query-eda1e5d1109a13feb14be254eaca630be29176bd5d1246c974642d40201782fc.json similarity index 63% rename from backend/.sqlx/query-6eb076afcf11dba0dfe35ee108c64814f95dbe5376f0539195d24a8ca96ca3e2.json rename to backend/.sqlx/query-eda1e5d1109a13feb14be254eaca630be29176bd5d1246c974642d40201782fc.json index 96825b99055be..fdc34ee1c7ca8 100644 --- a/backend/.sqlx/query-6eb076afcf11dba0dfe35ee108c64814f95dbe5376f0539195d24a8ca96ca3e2.json +++ b/backend/.sqlx/query-eda1e5d1109a13feb14be254eaca630be29176bd5d1246c974642d40201782fc.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE kafka_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL\n WHERE path = $4 AND workspace_id = $5 RETURNING 1", + "query": "UPDATE kafka_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, error = NULL\n WHERE path = $4 AND workspace_id = $5 RETURNING 1", "describe": { "columns": [ { @@ -22,5 +22,5 @@ null ] }, - "hash": "6eb076afcf11dba0dfe35ee108c64814f95dbe5376f0539195d24a8ca96ca3e2" + "hash": "eda1e5d1109a13feb14be254eaca630be29176bd5d1246c974642d40201782fc" } diff --git a/backend/.sqlx/query-ef299490c4674c4c76e18d84620a74407b78378d66d8a089407998074059e79b.json b/backend/.sqlx/query-ef299490c4674c4c76e18d84620a74407b78378d66d8a089407998074059e79b.json index a3301cfb9a7c9..2a89b1e01e13a 100644 --- a/backend/.sqlx/query-ef299490c4674c4c76e18d84620a74407b78378d66d8a089407998074059e79b.json +++ b/backend/.sqlx/query-ef299490c4674c4c76e18d84620a74407b78378d66d8a089407998074059e79b.json @@ -17,7 +17,8 @@ "http", "websocket", "kafka", - "email" + "email", + "nats" ] } } diff --git a/backend/.sqlx/query-e86295e181a82823ffce8234d413ab5a528b0468715238e17ffed7d75e9c0c5c.json b/backend/.sqlx/query-f16b00bad2880f896f4452e5894fe101cd5c2fe3e4c143c0b979668143d85dd2.json similarity index 63% rename from backend/.sqlx/query-e86295e181a82823ffce8234d413ab5a528b0468715238e17ffed7d75e9c0c5c.json rename to backend/.sqlx/query-f16b00bad2880f896f4452e5894fe101cd5c2fe3e4c143c0b979668143d85dd2.json index 42f6b47554523..a1ce88bd88d79 100644 --- a/backend/.sqlx/query-e86295e181a82823ffce8234d413ab5a528b0468715238e17ffed7d75e9c0c5c.json +++ b/backend/.sqlx/query-f16b00bad2880f896f4452e5894fe101cd5c2fe3e4c143c0b979668143d85dd2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE capture_config SET server_id = $1, last_server_ping = now() WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'websocket' AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "query": "UPDATE capture_config SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'nats' AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", "describe": { "columns": [ { @@ -21,5 +21,5 @@ null ] }, - "hash": "e86295e181a82823ffce8234d413ab5a528b0468715238e17ffed7d75e9c0c5c" + "hash": "f16b00bad2880f896f4452e5894fe101cd5c2fe3e4c143c0b979668143d85dd2" } diff --git a/backend/.sqlx/query-ffb6b2a40b605f9c55826a0e519af80b100d2ffa58ec567223e27099d364fcc8.json b/backend/.sqlx/query-ffb6b2a40b605f9c55826a0e519af80b100d2ffa58ec567223e27099d364fcc8.json new file mode 100644 index 0000000000000..c75cb3385ad87 --- /dev/null +++ b/backend/.sqlx/query-ffb6b2a40b605f9c55826a0e519af80b100d2ffa58ec567223e27099d364fcc8.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE capture_config SET error = $1, server_id = NULL, last_server_ping = NULL WHERE workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'nats'", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "ffb6b2a40b605f9c55826a0e519af80b100d2ffa58ec567223e27099d364fcc8" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 54b68aa39297e..24af30d3d6ce6 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -474,6 +474,42 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-nats" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76433c4de73442daedb3a59e991d94e85c14ebfc33db53dfcd347a21cd6ef4f8" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.5", + "regex", + "ring 0.17.8", + "rustls-native-certs 0.7.3", + "rustls-pemfile 2.2.0", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls 0.26.1", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-oauth2" version = "0.5.0" @@ -1429,6 +1465,9 @@ name = "bytes" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -3069,6 +3108,7 @@ dependencies = [ "ed25519", "serde", "sha2 0.10.8", + "signature", "subtle", "zeroize", ] @@ -5363,6 +5403,21 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f49e787f4c61cbd0f9320b31cc26e58719f6aa5068e34697dd3aea361412fe3" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.15", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -5383,6 +5438,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num" version = "0.4.3" @@ -7657,6 +7721,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.16" @@ -7866,6 +7939,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -9503,6 +9588,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http 1.2.0", + "httparse", + "rand 0.8.5", + "ring 0.17.8", + "rustls-native-certs 0.8.1", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.1", + "tokio-util", +] + [[package]] name = "toml" version = "0.7.8" @@ -9895,6 +10001,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9f0a709784e86923586cff0d872dba54cd2d2e116b3bc57587d15737cfce9d" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.24.0" @@ -10638,6 +10755,7 @@ version = "1.444.0" dependencies = [ "anyhow", "argon2", + "async-nats", "async-oauth2", "async-recursion", "async-stream", @@ -10671,6 +10789,7 @@ dependencies = [ "matchit", "mime_guess", "native-tls", + "nkeys", "object_store", "openidconnect", "openssl", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index e9701b9b99a66..4d31757396fcd 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -63,6 +63,7 @@ tantivy = ["dep:windmill-indexer", "windmill-api/tantivy", "windmill-indexer/ent sqlx = ["windmill-worker/sqlx"] deno_core = ["windmill-worker/deno_core", "dep:deno_core"] kafka = ["windmill-api/kafka"] +nats = ["windmill-api/nats"] otel = ["windmill-common/otel", "windmill-worker/otel"] dind = ["windmill-worker/dind"] php = ["windmill-worker/php"] @@ -285,6 +286,8 @@ openssl = "=0.10" mail-parser = "^0" matchit = "=0.7.3" rdkafka = { version = "0.36.2", features = ["cmake-build", "ssl-vendored"] } +async-nats = "0.38.0" +nkeys = "0.4.4" datafusion = "39.0.0" object_store = { version = "0.10.0", features = ["aws", "azure"] } diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index 252d770494ff0..6620861be80d5 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -e9bb21fc9d651e4302f9c3b7b3e8f35e00679e4b \ No newline at end of file +9cea7e44f1b6401662e22088b41e8a00d7e67b23 \ No newline at end of file diff --git a/backend/migrations/20250106164709_nats_triggers.down.sql b/backend/migrations/20250106164709_nats_triggers.down.sql new file mode 100644 index 0000000000000..bea7712cf495d --- /dev/null +++ b/backend/migrations/20250106164709_nats_triggers.down.sql @@ -0,0 +1 @@ +DROP TABLE nats_trigger; \ No newline at end of file diff --git a/backend/migrations/20250106164709_nats_triggers.up.sql b/backend/migrations/20250106164709_nats_triggers.up.sql new file mode 100644 index 0000000000000..dc7688a7f2adb --- /dev/null +++ b/backend/migrations/20250106164709_nats_triggers.up.sql @@ -0,0 +1,71 @@ +ALTER TYPE trigger_kind ADD VALUE IF NOT EXISTS 'nats'; + +CREATE TABLE nats_trigger ( + path VARCHAR(255) NOT NULL, + nats_resource_path VARCHAR(255) NOT NULL, + subjects VARCHAR(255)[] NOT NULL, + stream_name VARCHAR(255) NULL, + consumer_name VARCHAR(255) NULL, + use_jetstream BOOLEAN NOT NULL, + script_path VARCHAR(255) NOT NULL, + is_flow BOOLEAN NOT NULL, + workspace_id VARCHAR(50) NOT NULL, + edited_by VARCHAR(50) NOT NULL, + email VARCHAR(255) NOT NULL, + edited_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + extra_perms JSONB NOT NULL DEFAULT '{}', + server_id VARCHAR(50) NULL, + last_server_ping TIMESTAMPTZ NULL, + error TEXT NULL, + enabled BOOLEAN NOT NULL, + PRIMARY KEY (path, workspace_id), + FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE +); + +GRANT ALL ON nats_trigger TO windmill_user; +GRANT ALL ON nats_trigger TO windmill_admin; + +ALTER TABLE nats_trigger ENABLE ROW LEVEL SECURITY; + +CREATE POLICY admin_policy ON nats_trigger FOR ALL TO windmill_admin USING (true); + +CREATE POLICY see_folder_extra_perms_user_select ON nats_trigger FOR SELECT TO windmill_user +USING (SPLIT_PART(nats_trigger.path, '/', 1) = 'f' AND SPLIT_PART(nats_trigger.path, '/', 2) = any(regexp_split_to_array(current_setting('session.folders_read'), ',')::text[])); +CREATE POLICY see_folder_extra_perms_user_insert ON nats_trigger FOR INSERT TO windmill_user +WITH CHECK (SPLIT_PART(nats_trigger.path, '/', 1) = 'f' AND SPLIT_PART(nats_trigger.path, '/', 2) = any(regexp_split_to_array(current_setting('session.folders_write'), ',')::text[])); +CREATE POLICY see_folder_extra_perms_user_update ON nats_trigger FOR UPDATE TO windmill_user +USING (SPLIT_PART(nats_trigger.path, '/', 1) = 'f' AND SPLIT_PART(nats_trigger.path, '/', 2) = any(regexp_split_to_array(current_setting('session.folders_write'), ',')::text[])); +CREATE POLICY see_folder_extra_perms_user_delete ON nats_trigger FOR DELETE TO windmill_user +USING (SPLIT_PART(nats_trigger.path, '/', 1) = 'f' AND SPLIT_PART(nats_trigger.path, '/', 2) = any(regexp_split_to_array(current_setting('session.folders_write'), ',')::text[])); + +CREATE POLICY see_own ON nats_trigger FOR ALL TO windmill_user +USING (SPLIT_PART(nats_trigger.path, '/', 1) = 'u' AND SPLIT_PART(nats_trigger.path, '/', 2) = current_setting('session.user')); +CREATE POLICY see_member ON nats_trigger FOR ALL TO windmill_user +USING (SPLIT_PART(nats_trigger.path, '/', 1) = 'g' AND SPLIT_PART(nats_trigger.path, '/', 2) = any(regexp_split_to_array(current_setting('session.groups'), ',')::text[])); + +CREATE POLICY see_extra_perms_user_select ON nats_trigger FOR SELECT TO windmill_user +USING (extra_perms ? CONCAT('u/', current_setting('session.user'))); +CREATE POLICY see_extra_perms_user_insert ON nats_trigger FOR INSERT TO windmill_user +WITH CHECK ((extra_perms ->> CONCAT('u/', current_setting('session.user')))::boolean); +CREATE POLICY see_extra_perms_user_update ON nats_trigger FOR UPDATE TO windmill_user +USING ((extra_perms ->> CONCAT('u/', current_setting('session.user')))::boolean); +CREATE POLICY see_extra_perms_user_delete ON nats_trigger FOR DELETE TO windmill_user +USING ((extra_perms ->> CONCAT('u/', current_setting('session.user')))::boolean); + +CREATE POLICY see_extra_perms_groups_select ON nats_trigger FOR SELECT TO windmill_user +USING (extra_perms ?| regexp_split_to_array(current_setting('session.pgroups'), ',')::text[]); +CREATE POLICY see_extra_perms_groups_insert ON nats_trigger FOR INSERT TO windmill_user +WITH CHECK (exists( + SELECT key, value FROM jsonb_each_text(extra_perms) + WHERE SPLIT_PART(key, '/', 1) = 'g' AND key = ANY(regexp_split_to_array(current_setting('session.pgroups'), ',')::text[]) + AND value::boolean)); +CREATE POLICY see_extra_perms_groups_update ON nats_trigger FOR UPDATE TO windmill_user +USING (exists( + SELECT key, value FROM jsonb_each_text(extra_perms) + WHERE SPLIT_PART(key, '/', 1) = 'g' AND key = ANY(regexp_split_to_array(current_setting('session.pgroups'), ',')::text[]) + AND value::boolean)); +CREATE POLICY see_extra_perms_groups_delete ON nats_trigger FOR DELETE TO windmill_user +USING (exists( + SELECT key, value FROM jsonb_each_text(extra_perms) + WHERE SPLIT_PART(key, '/', 1) = 'g' AND key = ANY(regexp_split_to_array(current_setting('session.pgroups'), ',')::text[]) + AND value::boolean)); \ No newline at end of file diff --git a/backend/windmill-api/Cargo.toml b/backend/windmill-api/Cargo.toml index 95778875d8783..3bf133c0203d7 100644 --- a/backend/windmill-api/Cargo.toml +++ b/backend/windmill-api/Cargo.toml @@ -20,6 +20,7 @@ prometheus = ["windmill-common/prometheus", "windmill-queue/prometheus", "dep:pr openidconnect = ["dep:openidconnect"] tantivy = ["dep:windmill-indexer"] kafka = ["dep:rdkafka"] +nats = ["dep:async-nats", "dep:nkeys"] websocket = ["dep:tokio-tungstenite"] smtp = ["dep:mail-parser", "dep:openssl", "windmill-common/smtp"] license = ["dep:rsa"] @@ -103,6 +104,8 @@ jsonwebtoken = { workspace = true } matchit = { workspace = true, optional = true } tokio-tungstenite = { workspace = true, optional = true} rdkafka = { workspace = true, optional = true } +async-nats = { workspace = true, optional = true } +nkeys = { workspace = true, optional = true } const_format.workspace = true pin-project.workspace = true diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 4443894e5e30e..9ee4881cdb996 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -2379,10 +2379,13 @@ paths: type: boolean kafka_used: type: boolean + nats_used: + type: boolean required: - http_routes_used - websocket_used - kafka_used + - nats_used /w/{workspace}/users/list: get: @@ -8018,6 +8021,169 @@ paths: schema: type: string + /w/{workspace}/nats_triggers/create: + post: + summary: create nats trigger + operationId: createNatsTrigger + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + requestBody: + description: new nats trigger + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/NewNatsTrigger" + responses: + "201": + description: nats trigger created + content: + text/plain: + schema: + type: string + + /w/{workspace}/nats_triggers/update/{path}: + post: + summary: update nats trigger + operationId: updateNatsTrigger + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Path" + requestBody: + description: updated trigger + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/EditNatsTrigger" + responses: + "200": + description: nats trigger updated + content: + text/plain: + schema: + type: string + + /w/{workspace}/nats_triggers/delete/{path}: + delete: + summary: delete nats trigger + operationId: deleteNatsTrigger + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Path" + responses: + "200": + description: nats trigger deleted + content: + text/plain: + schema: + type: string + + /w/{workspace}/nats_triggers/get/{path}: + get: + summary: get nats trigger + operationId: getNatsTrigger + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Path" + responses: + "200": + description: nats trigger deleted + content: + application/json: + schema: + $ref: "#/components/schemas/NatsTrigger" + + + /w/{workspace}/nats_triggers/list: + get: + summary: list nats triggers + operationId: listNatsTriggers + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + required: true + - $ref: "#/components/parameters/Page" + - $ref: "#/components/parameters/PerPage" + - name: path + description: filter by path + in: query + schema: + type: string + - name: is_flow + in: query + schema: + type: boolean + - name: path_start + in: query + schema: + type: string + responses: + "200": + description: nats trigger list + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/NatsTrigger" + + + /w/{workspace}/nats_triggers/exists/{path}: + get: + summary: does nats trigger exists + operationId: existsNatsTrigger + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Path" + responses: + "200": + description: nats trigger exists + content: + application/json: + schema: + type: boolean + + /w/{workspace}/nats_triggers/setenabled/{path}: + post: + summary: set enabled nats trigger + operationId: setNatsTriggerEnabled + tags: + - nats_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Path" + requestBody: + description: updated nats trigger enable + required: true + content: + application/json: + schema: + type: object + properties: + enabled: + type: boolean + required: + - enabled + responses: + "200": + description: nats trigger enabled set + content: + text/plain: + schema: + type: string + /groups/list: get: @@ -8888,6 +9054,7 @@ paths: http_trigger, websocket_trigger, kafka_trigger, + nats_trigger, ] responses: "200": @@ -8927,6 +9094,7 @@ paths: http_trigger, websocket_trigger, kafka_trigger, + nats_trigger, ] requestBody: description: acl to add @@ -8977,6 +9145,7 @@ paths: http_trigger, websocket_trigger, kafka_trigger, + nats_trigger, ] requestBody: description: acl to add @@ -12392,6 +12561,8 @@ components: type: number kafka_count: type: number + nats_count: + type: number WebsocketTrigger: type: object @@ -12665,6 +12836,126 @@ components: - topics - is_flow + NatsTrigger: + type: object + properties: + path: + type: string + edited_by: + type: string + edited_at: + type: string + format: date-time + script_path: + type: string + nats_resource_path: + type: string + use_jetstream: + type: boolean + stream_name: + type: string + consumer_name: + type: string + subjects: + type: array + items: + type: string + is_flow: + type: boolean + extra_perms: + type: object + additionalProperties: + type: boolean + email: + type: string + workspace_id: + type: string + server_id: + type: string + last_server_ping: + type: string + format: date-time + error: + type: string + enabled: + type: boolean + + required: + - path + - edited_by + - edited_at + - script_path + - nats_resource_path + - use_jetstream + - subjects + - extra_perms + - is_flow + - email + - workspace_id + - enabled + + NewNatsTrigger: + type: object + properties: + path: + type: string + script_path: + type: string + is_flow: + type: boolean + nats_resource_path: + type: string + use_jetstream: + type: boolean + stream_name: + type: string + consumer_name: + type: string + subjects: + type: array + items: + type: string + enabled: + type: boolean + + required: + - path + - script_path + - is_flow + - nats_resource_path + - use_jetstream + - subjects + + EditNatsTrigger: + type: object + properties: + nats_resource_path: + type: string + use_jetstream: + type: boolean + stream_name: + type: string + consumer_name: + type: string + subjects: + type: array + items: + type: string + path: + type: string + script_path: + type: string + is_flow: + type: boolean + + required: + - path + - script_path + - nats_resource_path + - use_jetstream + - subjects + - is_flow + Group: type: object properties: @@ -13634,7 +13925,7 @@ components: CaptureTriggerKind: type: string - enum: [webhook, http, websocket, kafka, email] + enum: [webhook, http, websocket, kafka, email, nats] Capture: type: object diff --git a/backend/windmill-api/src/capture.rs b/backend/windmill-api/src/capture.rs index 742c2b407041d..2778c8ec4e1a3 100644 --- a/backend/windmill-api/src/capture.rs +++ b/backend/windmill-api/src/capture.rs @@ -36,6 +36,8 @@ use windmill_queue::{PushArgs, PushArgsOwned}; use crate::http_triggers::{build_http_trigger_extra, HttpMethod}; #[cfg(all(feature = "enterprise", feature = "kafka"))] use crate::kafka_triggers_ee::KafkaResourceSecurity; +#[cfg(all(feature = "enterprise", feature = "nats"))] +use crate::nats_triggers_ee::NatsResourceAuth; use crate::{ args::WebhookArgs, db::{ApiAuthed, DB}, @@ -84,6 +86,7 @@ pub enum TriggerKind { Websocket, Kafka, Email, + Nats, } impl fmt::Display for TriggerKind { @@ -94,6 +97,7 @@ impl fmt::Display for TriggerKind { TriggerKind::Websocket => "websocket", TriggerKind::Kafka => "kafka", TriggerKind::Email => "email", + TriggerKind::Nats => "nats", }; write!(f, "{}", s) } @@ -123,6 +127,27 @@ pub struct KafkaTriggerConfig { pub group_id: String, } +#[cfg(all(feature = "enterprise", feature = "nats"))] +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NatsTriggerConfigConnection { + Resource { nats_resource_path: String }, + Static { servers: Vec, auth: NatsResourceAuth, require_tls: bool }, +} + +#[cfg(all(feature = "enterprise", feature = "nats"))] +#[derive(Serialize, Deserialize)] +pub struct NatsTriggerConfig { + #[serde(flatten)] + pub connection: NatsTriggerConfigConnection, + pub subjects: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub stream_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub consumer_name: Option, + pub use_jetstream: bool, +} + #[derive(Serialize, Deserialize, Debug)] pub struct WebsocketTriggerConfig { pub url: String, @@ -138,6 +163,8 @@ enum TriggerConfig { Websocket(WebsocketTriggerConfig), #[cfg(all(feature = "enterprise", feature = "kafka"))] Kafka(KafkaTriggerConfig), + #[cfg(all(feature = "enterprise", feature = "nats"))] + Nats(NatsTriggerConfig), } #[derive(Serialize, Deserialize)] diff --git a/backend/windmill-api/src/granular_acls.rs b/backend/windmill-api/src/granular_acls.rs index da2d6655e7300..1f0f5bebf0ddf 100644 --- a/backend/windmill-api/src/granular_acls.rs +++ b/backend/windmill-api/src/granular_acls.rs @@ -23,7 +23,7 @@ use windmill_common::{ utils::{not_found_if_none, StripPath}, }; -const KINDS: [&str; 12] = [ +const KINDS: [&str; 13] = [ "script", "group_", "resource", @@ -36,6 +36,7 @@ const KINDS: [&str; 12] = [ "http_trigger", "websocket_trigger", "kafka_trigger", + "nats_trigger", ]; pub fn workspaced_service() -> Router { diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index 969c796c4c785..08b6df7c10c95 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -82,6 +82,8 @@ pub mod job_metrics; pub mod jobs; #[cfg(all(feature = "enterprise", feature = "kafka"))] mod kafka_triggers_ee; +#[cfg(all(feature = "enterprise", feature = "nats"))] +mod nats_triggers_ee; #[cfg(feature = "oauth2")] pub mod oauth2_ee; mod oidc_ee; @@ -93,6 +95,7 @@ mod scim_ee; mod scripts; mod service_logs; mod settings; +mod slack_approvals; #[cfg(feature = "smtp")] mod smtp_server_ee; mod static_assets; @@ -109,7 +112,6 @@ mod websocket_triggers; mod workers; mod workspaces; mod workspaces_ee; -mod slack_approvals; mod workspaces_export; mod workspaces_extra; @@ -274,6 +276,18 @@ pub async fn run_server( } }; + let nats_triggers_service = { + #[cfg(all(feature = "enterprise", feature = "nats"))] + { + nats_triggers_ee::workspaced_service() + } + + #[cfg(not(all(feature = "enterprise", feature = "nats")))] + { + Router::new() + } + }; + if !*CLOUD_HOSTED { #[cfg(feature = "websocket")] { @@ -286,6 +300,12 @@ pub async fn run_server( let kafka_killpill_rx = rx.resubscribe(); kafka_triggers_ee::start_kafka_consumers(db.clone(), kafka_killpill_rx).await; } + + #[cfg(all(feature = "enterprise", feature = "nats"))] + { + let nats_killpill_rx = rx.resubscribe(); + nats_triggers_ee::start_nats_consumers(db.clone(), nats_killpill_rx).await; + } } // build our application with a route @@ -354,7 +374,8 @@ pub async fn run_server( #[cfg(not(feature = "websocket"))] Router::new() }) - .nest("/kafka_triggers", kafka_triggers_service), + .nest("/kafka_triggers", kafka_triggers_service) + .nest("/nats_triggers", nats_triggers_service), ) .nest("/workspaces", workspaces::global_service()) .nest( @@ -414,7 +435,10 @@ pub async fn run_server( jobs::workspace_unauthed_service().layer(cors.clone()), ) .route("/slack", post(slack_approvals::slack_app_callback_handler)) - .route("/w/:workspace_id/jobs/slack_approval/:job_id", get(slack_approvals::request_slack_approval)) + .route( + "/w/:workspace_id/jobs/slack_approval/:job_id", + get(slack_approvals::request_slack_approval), + ) .nest( "/w/:workspace_id/resources_u", resources::public_service().layer(cors.clone()), diff --git a/backend/windmill-api/src/nats_triggers_ee.rs b/backend/windmill-api/src/nats_triggers_ee.rs new file mode 100644 index 0000000000000..fedf7981ff014 --- /dev/null +++ b/backend/windmill-api/src/nats_triggers_ee.rs @@ -0,0 +1,17 @@ +use crate::db::DB; +use axum::Router; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct NatsResourceAuth {} + +pub fn workspaced_service() -> Router { + Router::new() +} + +pub async fn start_nats_consumers( + _db: DB, + mut _killpill_rx: tokio::sync::broadcast::Receiver<()>, +) -> () { + // implementation is not open source +} diff --git a/backend/windmill-api/src/triggers.rs b/backend/windmill-api/src/triggers.rs index bc734fc6ea0f6..0f3edbc87c3c5 100644 --- a/backend/windmill-api/src/triggers.rs +++ b/backend/windmill-api/src/triggers.rs @@ -19,6 +19,7 @@ pub struct TriggersCount { email_count: i64, websocket_count: i64, kafka_count: i64, + nats_count: i64, } pub(crate) async fn get_triggers_count_internal( db: &DB, @@ -75,6 +76,16 @@ pub(crate) async fn get_triggers_count_internal( .await? .unwrap_or(0); + let nats_count = sqlx::query_scalar!( + "SELECT COUNT(*) FROM nats_trigger WHERE script_path = $1 AND is_flow = $2 AND workspace_id = $3", + path, + is_flow, + w_id + ) + .fetch_one(db) + .await? + .unwrap_or(0); + let webhook_count = (if is_flow { sqlx::query_scalar!( "SELECT COUNT(*) FROM token WHERE label LIKE 'webhook-%' AND workspace_id = $1 AND scopes @> ARRAY['run:flow/' || $2]::text[]", @@ -117,6 +128,7 @@ pub(crate) async fn get_triggers_count_internal( email_count, websocket_count, kafka_count, + nats_count, })) } diff --git a/backend/windmill-api/src/websocket_triggers.rs b/backend/windmill-api/src/websocket_triggers.rs index 6fbdff49caa6a..b04e6250a597a 100644 --- a/backend/windmill-api/src/websocket_triggers.rs +++ b/backend/windmill-api/src/websocket_triggers.rs @@ -252,7 +252,7 @@ async fn update_websocket_trigger( // important to update server_id, last_server_ping and error to NULL to stop current websocket listener sqlx::query!( - "UPDATE websocket_trigger SET url = $1, script_path = $2, path = $3, is_flow = $4, filters = $5, initial_messages = $6, url_runnable_args = $7, edited_by = $8, email = $9, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL + "UPDATE websocket_trigger SET url = $1, script_path = $2, path = $3, is_flow = $4, filters = $5, initial_messages = $6, url_runnable_args = $7, edited_by = $8, email = $9, edited_at = now(), server_id = NULL, error = NULL WHERE workspace_id = $10 AND path = $11", ct.url, ct.script_path, @@ -300,7 +300,7 @@ pub async fn set_enabled( // important to set server_id, last_server_ping and error to NULL to stop current websocket listener let one_o = sqlx::query_scalar!( - "UPDATE websocket_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, last_server_ping = NULL, error = NULL + "UPDATE websocket_trigger SET enabled = $1, email = $2, edited_by = $3, edited_at = now(), server_id = NULL, error = NULL WHERE path = $4 AND workspace_id = $5 RETURNING 1", payload.enabled, &authed.email, @@ -384,7 +384,7 @@ async fn listen_to_unlistened_websockets( match sqlx::query_as::<_, WebsocketTrigger>( r#"SELECT * FROM websocket_trigger - WHERE enabled IS TRUE AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')"# + WHERE enabled IS TRUE AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')"# ) .fetch_all(db) .await @@ -402,7 +402,7 @@ async fn listen_to_unlistened_websockets( match sqlx::query_as!( CaptureConfigForWebsocket, - r#"SELECT path, is_flow, workspace_id, trigger_config as "trigger_config!: _", owner, email FROM capture_config WHERE trigger_kind = 'websocket' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')"# + r#"SELECT path, is_flow, workspace_id, trigger_config as "trigger_config!: _", owner, email FROM capture_config WHERE trigger_kind = 'websocket' AND last_client_ping > NOW() - INTERVAL '10 seconds' AND trigger_config IS NOT NULL AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds')"# ) .fetch_all(db) .await @@ -643,7 +643,7 @@ impl WebsocketTrigger { killpill_rx: tokio::sync::broadcast::Receiver<()>, ) -> () { match sqlx::query_scalar!( - "UPDATE websocket_trigger SET server_id = $1, last_server_ping = now() WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "UPDATE websocket_trigger SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE enabled IS TRUE AND workspace_id = $2 AND path = $3 AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", *INSTANCE_NAME, self.workspace_id, self.path, @@ -845,7 +845,7 @@ impl CaptureConfigForWebsocket { killpill_rx: tokio::sync::broadcast::Receiver<()>, ) -> () { match sqlx::query_scalar!( - "UPDATE capture_config SET server_id = $1, last_server_ping = now() WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'websocket' AND (server_id IS NULL OR last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", + "UPDATE capture_config SET server_id = $1, last_server_ping = now(), error = 'Connecting...' WHERE last_client_ping > NOW() - INTERVAL '10 seconds' AND workspace_id = $2 AND path = $3 AND is_flow = $4 AND trigger_kind = 'websocket' AND (last_server_ping IS NULL OR last_server_ping < now() - interval '15 seconds') RETURNING true", *INSTANCE_NAME, self.workspace_id, self.path, @@ -1078,117 +1078,116 @@ async fn listen_to_websocket( let (writer, mut reader) = ws_stream.split(); let mut last_ping = tokio::time::Instant::now(); - tokio::select! { - biased; - _ = killpill_rx.recv() => { - return; - } - _ = async { - match &ws { - WebsocketEnum::Trigger(ws_trigger) => { - if let Err(err) = ws_trigger.send_initial_messages(writer, &db).await { + + // send initial messages + match &ws { + WebsocketEnum::Trigger(ws_trigger) => { + tokio::select! { + biased; + _ = killpill_rx.recv() => { + return; + }, + _ = loop_ping(&db, &ws, Some("Sending initial messages...")) => { + return; + }, + result = ws_trigger.send_initial_messages(writer, &db) => { + if let Err(err) = result { ws_trigger.disable_with_error(&db, format!("Error sending initial messages: {:?}", err)).await; + return } else { tracing::debug!("Initial messages sent successfully to websocket {}", url); - // if initial messages sent successfully, wait forever - futures::future::pending::<()>().await; } - }, - WebsocketEnum::Capture(_) => { - futures::future::pending::<()>().await; } } - } => { - // was disabled => exit - return; }, - _ = async { - loop { - tokio::select! { - biased; - msg = reader.next() => { - if let Some(msg) = msg { - if last_ping.elapsed() > tokio::time::Duration::from_secs(5) { - if let None = ws.update_ping(&db, None).await { - return; - } - last_ping = tokio::time::Instant::now(); - } + _ => {} + } + + loop { + tokio::select! { + biased; + _ = killpill_rx.recv() => { + return; + } + msg = reader.next() => { + if let Some(msg) = msg { + if last_ping.elapsed() > tokio::time::Duration::from_secs(5) { + if let None = ws.update_ping(&db, None).await { + return; + } + last_ping = tokio::time::Instant::now(); + } + match msg { + Ok(msg) => { match msg { - Ok(msg) => { - match msg { - tokio_tungstenite::tungstenite::Message::Text(text) => { - tracing::debug!("Received text message from websocket {}: {}", url, text); - let mut should_handle = true; - for filter in &filters { - match filter { - Filter::JsonFilter(JsonFilter { key, value }) => { - let mut deserializer = serde_json::Deserializer::from_str(text.as_str()); - should_handle = match is_value_superset(&mut deserializer, key, &value) { - Ok(filter_match) => { - filter_match - }, - Err(err) => { - tracing::warn!("Error deserializing filter for websocket {}: {:?}", url, err); - false - } - }; - } - } - if !should_handle { - break; - } - } - if should_handle { - - let args = HashMap::from([("msg".to_string(), to_raw_value(&text))]); - let extra = Some(HashMap::from([( - "wm_trigger".to_string(), - to_raw_value(&serde_json::json!({"kind": "websocket", "websocket": { "url": url }})), - )])); - - let args = PushArgsOwned { args, extra }; - match &ws { - WebsocketEnum::Trigger(ws_trigger) => { - ws_trigger.handle(&db, args).await; + tokio_tungstenite::tungstenite::Message::Text(text) => { + tracing::debug!("Received text message from websocket {}: {}", url, text); + let mut should_handle = true; + for filter in &filters { + match filter { + Filter::JsonFilter(JsonFilter { key, value }) => { + let mut deserializer = serde_json::Deserializer::from_str(text.as_str()); + should_handle = match is_value_superset(&mut deserializer, key, &value) { + Ok(filter_match) => { + filter_match }, - WebsocketEnum::Capture(capture) => { - capture.handle(&db, args).await; + Err(err) => { + tracing::warn!("Error deserializing filter for websocket {}: {:?}", url, err); + false } - } + }; + } + } + if !should_handle { + break; + } + } + if should_handle { + + let args = HashMap::from([("msg".to_string(), to_raw_value(&text))]); + let extra = Some(HashMap::from([( + "wm_trigger".to_string(), + to_raw_value(&serde_json::json!({"kind": "websocket", "websocket": { "url": url }})), + )])); + + let args = PushArgsOwned { args, extra }; + match &ws { + WebsocketEnum::Trigger(ws_trigger) => { + ws_trigger.handle(&db, args).await; + }, + WebsocketEnum::Capture(capture) => { + capture.handle(&db, args).await; } - }, - a @ _ => { - tracing::debug!("Received non text-message from websocket {}: {:?}", url, a); } } }, - Err(err) => { - tracing::error!("Error reading from websocket {}: {:?}", url, err); + a @ _ => { + tracing::debug!("Received non text-message from websocket {}: {:?}", url, a); } } - } else { - tracing::error!("Websocket {} closed", url); - if let None = ws.update_ping(&db, Some("Websocket closed")).await { - return; - } - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - break; + }, + Err(err) => { + tracing::error!("Error reading from websocket {}: {:?}", url, err); } - }, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => { - tracing::debug!("Sending ping to websocket {}", url); - if let None = ws.update_ping(&db, None).await { - return; - } - last_ping = tokio::time::Instant::now(); - }, + } + } else { + tracing::error!("Websocket {} closed", url); + if let None = ws.update_ping(&db, Some("Websocket closed")).await { + return; + } + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + break; } - } - } => { - return; + }, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => { + tracing::debug!("Sending ping to websocket {}", url); + if let None = ws.update_ping(&db, None).await { + return; + } + last_ping = tokio::time::Instant::now(); + }, } - }; + } } Err(err) => { tracing::error!("Error connecting to websocket {}: {:?}", url, err); diff --git a/backend/windmill-api/src/workspaces.rs b/backend/windmill-api/src/workspaces.rs index ea07df6171c66..5c8ed2fd87f2b 100644 --- a/backend/windmill-api/src/workspaces.rs +++ b/backend/windmill-api/src/workspaces.rs @@ -1303,6 +1303,7 @@ struct UsedTriggers { pub websocket_used: bool, pub http_routes_used: bool, pub kafka_used: bool, + pub nats_used: bool, } async fn get_used_triggers( @@ -1316,7 +1317,8 @@ async fn get_used_triggers( r#"SELECT EXISTS(SELECT 1 FROM websocket_trigger WHERE workspace_id = $1) as "websocket_used!", EXISTS(SELECT 1 FROM http_trigger WHERE workspace_id = $1) as "http_routes_used!", - EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as "kafka_used!""#, + EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as "kafka_used!", + EXISTS(SELECT 1 FROM nats_trigger WHERE workspace_id = $1) as "nats_used!""#, w_id, ) .fetch_one(&mut *tx) diff --git a/frontend/src/lib/components/ArgInput.svelte b/frontend/src/lib/components/ArgInput.svelte index 1084fc16c67c9..632b115863e6a 100644 --- a/frontend/src/lib/components/ArgInput.svelte +++ b/frontend/src/lib/components/ArgInput.svelte @@ -510,7 +510,7 @@ class="rounded-full p-1 bg-surface-secondary duration-200 hover:bg-surface-hover ml-2" aria-label="Clear" on:click={() => { - value.splice(i, 1) + value = value.filter((_, index) => index !== i) redraw += 1 }} > diff --git a/frontend/src/lib/components/Path.svelte b/frontend/src/lib/components/Path.svelte index 0ff3ff56bc09c..f042f17b761a8 100644 --- a/frontend/src/lib/components/Path.svelte +++ b/frontend/src/lib/components/Path.svelte @@ -15,7 +15,8 @@ HttpTriggerService, VariableService, WebsocketTriggerService, - KafkaTriggerService + KafkaTriggerService, + NatsTriggerService } from '$lib/gen' import { superadmin, userStore, workspaceStore } from '$lib/stores' import { createEventDispatcher, getContext } from 'svelte' @@ -39,6 +40,7 @@ | 'http_trigger' | 'websocket_trigger' | 'kafka_trigger' + | 'nats_trigger' let meta: Meta | undefined = undefined export let fullNamePlaceholder: string | undefined = undefined export let namePlaceholder = '' @@ -232,6 +234,11 @@ workspace: $workspaceStore!, path: path }) + } else if (kind == 'nats_trigger') { + return await NatsTriggerService.existsNatsTrigger({ + workspace: $workspaceStore!, + path: path + }) } else { return false } diff --git a/frontend/src/lib/components/SchemaForm.svelte b/frontend/src/lib/components/SchemaForm.svelte index 3245c6aac0ea0..93443292a5a19 100644 --- a/frontend/src/lib/components/SchemaForm.svelte +++ b/frontend/src/lib/components/SchemaForm.svelte @@ -85,8 +85,6 @@ let itemPicker: ItemPicker | undefined = undefined let variableEditor: VariableEditor | undefined = undefined - $: isValid = allTrue(inputCheck ?? {}) - let resourceTypes: string[] | undefined = undefined async function loadResourceTypes() { @@ -95,7 +93,7 @@ loadResourceTypes() - $: schema && reorder() + $: schema && (reorder(), (hidden = {})) function hasExtraKeys() { return Object.keys(args ?? {}).some((x) => !keys.includes(x)) @@ -134,6 +132,28 @@ } $: fields = items ?? keys.map((x) => ({ id: x, value: x })) + + let hidden: Record = {} + + function handleHiddenFields(schema: Schema | any, args: Record) { + for (const x of fields) { + if (schema?.properties[x.value]?.showExpr) { + if (computeShow(x.value, schema.properties[x.value].showExpr, args)) { + hidden[x.value] = false + } else if (!hidden[x.value]) { + hidden[x.value] = true + // remove arg (important: will not trigger a re-render) + delete args[x.value] + // make sure it's made valid + inputCheck[x.value] = true + } + } + } + } + + $: handleHiddenFields(schema, args) + + $: isValid = allTrue(inputCheck ?? {}) {#if showReset} @@ -164,7 +184,7 @@ }} > {#if typeof args == 'object' && schema?.properties[argName]} - {#if computeShow(argName, schema?.properties[argName].showExpr, args)} + {#if !hidden[argName]} {#if lightweightMode} ('Tabs') - $: isSelected = exact ? $selected == value : $selected?.startsWith(value) + function getIsSelectedFn(exact: boolean, otherValues: string[]) { + if (otherValues.length > 0) { + return (selected: string) => selected == value || otherValues.includes(selected) + } else if (exact) { + return (selected: string) => selected == value + } else { + return (selected: string) => selected?.startsWith(value) + } + } + + $: isSelectedFn = getIsSelectedFn(exact, otherValues) + + $: isSelected = isSelectedFn($selected) const fontSizeClasses = { xs: 'text-xs', diff --git a/frontend/src/lib/components/details/DetailPageDetailPanel.svelte b/frontend/src/lib/components/details/DetailPageDetailPanel.svelte index f9e4f927b9f1e..fb3e4d0ed0d99 100644 --- a/frontend/src/lib/components/details/DetailPageDetailPanel.svelte +++ b/frontend/src/lib/components/details/DetailPageDetailPanel.svelte @@ -12,7 +12,9 @@ | 'cli' | 'routes' | 'websockets' - | 'scheduledPoll' = 'webhooks' + | 'scheduledPoll' + | 'kafka' + | 'nats' = 'webhooks' export let flow_json: any | undefined = undefined export let simplfiedPoll: boolean = false @@ -52,6 +54,7 @@ + diff --git a/frontend/src/lib/components/details/DetailPageLayout.svelte b/frontend/src/lib/components/details/DetailPageLayout.svelte index d951f757411c4..21eb3c0d876f7 100644 --- a/frontend/src/lib/components/details/DetailPageLayout.svelte +++ b/frontend/src/lib/components/details/DetailPageLayout.svelte @@ -20,7 +20,15 @@ const primaryScheduleStore = writable(undefined) const selectedTriggerStore = writable< - 'webhooks' | 'emails' | 'schedules' | 'cli' | 'routes' | 'websockets' | 'scheduledPoll' + | 'webhooks' + | 'emails' + | 'schedules' + | 'cli' + | 'routes' + | 'websockets' + | 'scheduledPoll' + | 'kafka' + | 'nats' >('webhooks') const simplifiedPoll = writable(false) @@ -55,6 +63,7 @@ + @@ -100,6 +109,7 @@ + diff --git a/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte b/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte index e0a3d0b7f02d8..d25bb3cf89bf6 100644 --- a/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte +++ b/frontend/src/lib/components/details/DetailPageTriggerPanel.svelte @@ -1,9 +1,20 @@ @@ -45,10 +65,10 @@ Websockets - + - - Kafka + + Event streams @@ -77,8 +97,18 @@ {:else if triggerSelected === 'websockets'} - {:else if triggerSelected === 'kafka'} - + {:else if triggerSelected === 'kafka' || triggerSelected === 'nats'} +
+ + + + +
+ {#if eventStreamType === 'kafka'} + + {:else if eventStreamType === 'nats'} + + {/if} {:else if triggerSelected === 'cli'} {/if} diff --git a/frontend/src/lib/components/details/EmailTriggerConfigSection.svelte b/frontend/src/lib/components/details/EmailTriggerConfigSection.svelte index 7c01cf4959973..cbc05de415626 100644 --- a/frontend/src/lib/components/details/EmailTriggerConfigSection.svelte +++ b/frontend/src/lib/components/details/EmailTriggerConfigSection.svelte @@ -33,7 +33,6 @@ pad: false }) .toLowerCase() - console.log(encodedPrefix) return `${encodedPrefix}@${emailDomain}` } diff --git a/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte b/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte index cb930bfdf81c2..a38d3659fe8b7 100644 --- a/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte +++ b/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte @@ -1,16 +1,17 @@ {#each triggersToDisplay as type} {@const { icon, countKey } = triggerTypeConfig[type]} - {#if (!showOnlyWithCount || ($triggersCount?.[countKey] ?? 0) > 0) && !(type === 'kafka' && !$enterpriseLicense)} + {#if (!showOnlyWithCount || ((countKey && $triggersCount?.[countKey]) || 0) > 0) && !(type === 'kafka' && !$enterpriseLicense) && !(type === 'nats' && !$enterpriseLicense)} - {type.charAt(0).toUpperCase() + type.slice(1)} + {camelCaseToWords(type)} { - $selectedTrigger = type + $selectedTrigger = type === 'eventStreams' ? 'kafka' : type dispatch('select') }} - selected={selected && $selectedTrigger === type} + selected={selected && + ($selectedTrigger === type || + (type === 'eventStreams' && + ($selectedTrigger === 'kafka' || $selectedTrigger === 'nats')))} > - + {#if countKey} + + {/if} diff --git a/frontend/src/lib/components/icons/NatsIcon.svelte b/frontend/src/lib/components/icons/NatsIcon.svelte new file mode 100644 index 0000000000000..18c916d93bf85 --- /dev/null +++ b/frontend/src/lib/components/icons/NatsIcon.svelte @@ -0,0 +1,20 @@ + + + + + diff --git a/frontend/src/lib/components/icons/index.ts b/frontend/src/lib/components/icons/index.ts index 8a6d5f71c2af8..cc2dce50502bd 100644 --- a/frontend/src/lib/components/icons/index.ts +++ b/frontend/src/lib/components/icons/index.ts @@ -94,6 +94,8 @@ import KeycloakIcon from './KeycloakIcon.svelte' import ZitadelIcon from './ZitadelIcon.svelte' import SpotifyIcon from './SpotifyIcon.svelte' import XeroIcon from './XeroIcon.svelte' +import KafkaIcon from './KafkaIcon.svelte' +import NatsIcon from './NatsIcon.svelte' export const APP_TO_ICON_COMPONENT = { postgresql: PostgresIcon, mysql: Mysql, @@ -192,7 +194,9 @@ export const APP_TO_ICON_COMPONENT = { keycloak: KeycloakIcon, zitadel: ZitadelIcon, spotify: SpotifyIcon, - xero: XeroIcon + xero: XeroIcon, + kafka: KafkaIcon, + nats: NatsIcon } as const export { @@ -285,5 +289,7 @@ export { JumpCloudIcon, KeycloakIcon, ZitadelIcon, - XeroIcon + XeroIcon, + KafkaIcon, + NatsIcon } diff --git a/frontend/src/lib/components/sidebar/SidebarContent.svelte b/frontend/src/lib/components/sidebar/SidebarContent.svelte index 1ce863ff32c25..dd275dc0fc542 100644 --- a/frontend/src/lib/components/sidebar/SidebarContent.svelte +++ b/frontend/src/lib/components/sidebar/SidebarContent.svelte @@ -7,9 +7,7 @@ workspaceStore, isCriticalAlertsUIOpen, enterpriseLicense, - devopsRole - } from '$lib/stores' import { SIDEBAR_SHOW_SCHEDULES } from '$lib/consts' import { @@ -53,6 +51,7 @@ import { page } from '$app/stores' import SideBarNotification from './SideBarNotification.svelte' import KafkaIcon from '../icons/KafkaIcon.svelte' + import NatsIcon from '../icons/NatsIcon.svelte' export let numUnacknowledgedCriticalAlerts = 0 @@ -108,6 +107,13 @@ icon: KafkaIcon, disabled: $userStore?.operator || !$enterpriseLicense, kind: 'kafka' + }, + { + label: 'NATS' + ($enterpriseLicense ? '' : ' (EE)'), + href: '/nats_triggers', + icon: NatsIcon, + disabled: $userStore?.operator || !$enterpriseLicense, + kind: 'nats' } ] diff --git a/frontend/src/lib/components/triggers.ts b/frontend/src/lib/components/triggers.ts index e585c7d989621..57dbf51258d1e 100644 --- a/frontend/src/lib/components/triggers.ts +++ b/frontend/src/lib/components/triggers.ts @@ -48,7 +48,7 @@ export type TriggerKind = | 'websockets' | 'scheduledPoll' | 'kafka' - + | 'nats' export function captureTriggerKindToTriggerKind(kind: CaptureTriggerKind): TriggerKind { switch (kind) { case 'webhook': @@ -61,6 +61,8 @@ export function captureTriggerKindToTriggerKind(kind: CaptureTriggerKind): Trigg return 'websockets' case 'kafka': return 'kafka' + case 'nats': + return 'nats' default: throw new Error(`Unknown CaptureTriggerKind: ${kind}`) } diff --git a/frontend/src/lib/components/triggers/CaptureButton.svelte b/frontend/src/lib/components/triggers/CaptureButton.svelte index 153c2a99923ab..7b1616cc929e7 100644 --- a/frontend/src/lib/components/triggers/CaptureButton.svelte +++ b/frontend/src/lib/components/triggers/CaptureButton.svelte @@ -8,6 +8,7 @@ import { createEventDispatcher } from 'svelte' import { captureTriggerKindToTriggerKind } from '../triggers' import CaptureIcon from './CaptureIcon.svelte' + import NatsIcon from '../icons/NatsIcon.svelte' let isOpen = false @@ -76,6 +77,16 @@

Kafka

+ diff --git a/frontend/src/lib/components/triggers/CaptureWrapper.svelte b/frontend/src/lib/components/triggers/CaptureWrapper.svelte index 03625869a9f31..4be92a821fd23 100644 --- a/frontend/src/lib/components/triggers/CaptureWrapper.svelte +++ b/frontend/src/lib/components/triggers/CaptureWrapper.svelte @@ -13,6 +13,7 @@ import type { ConnectionInfo } from '../common/alert/ConnectionIndicator.svelte' import type { CaptureInfo } from './CaptureSection.svelte' import CaptureTable from './CaptureTable.svelte' + import NatsTriggersConfigSection from './NatsTriggersConfigSection.svelte' export let isFlow: boolean export let path: string @@ -131,15 +132,21 @@ let config: CaptureConfig | undefined $: config = captureConfigs[captureType] - let cloudDisabled = (captureType === 'websocket' || captureType === 'kafka') && isCloudHosted() + let cloudDisabled = + (captureType === 'websocket' || captureType === 'kafka' || captureType === 'nats') && + isCloudHosted() function updateConnectionInfo(config: CaptureConfig | undefined, captureActive: boolean) { - if ((captureType === 'websocket' || captureType === 'kafka') && config && captureActive) { + if ( + (captureType === 'websocket' || captureType === 'kafka' || captureType === 'nats') && + config && + captureActive + ) { const serverEnabled = getServerEnabled(config) const connected = serverEnabled && !config.error const message = connected - ? `${capitalize(captureType)} is connected` - : `${capitalize(captureType)} is not connected${config.error ? ': ' + config.error : ''}` + ? `Connected` + : `Not connected${config.error ? ': ' + config.error : ''}` connectionInfo = { connected, message @@ -251,5 +258,21 @@ handleCapture() }} /> + {:else if captureType === 'nats'} + { + handleCapture() + }} + /> {/if} diff --git a/frontend/src/lib/components/triggers/NatsTriggerEditor.svelte b/frontend/src/lib/components/triggers/NatsTriggerEditor.svelte new file mode 100644 index 0000000000000..e4b74437d99da --- /dev/null +++ b/frontend/src/lib/components/triggers/NatsTriggerEditor.svelte @@ -0,0 +1,27 @@ + + +{#if open} + +{/if} diff --git a/frontend/src/lib/components/triggers/NatsTriggerEditorInner.svelte b/frontend/src/lib/components/triggers/NatsTriggerEditorInner.svelte new file mode 100644 index 0000000000000..b2a80f7b19888 --- /dev/null +++ b/frontend/src/lib/components/triggers/NatsTriggerEditorInner.svelte @@ -0,0 +1,253 @@ + + + + + + {#if !drawerLoading} + {#if edit} +
+ { + await NatsTriggerService.setNatsTriggerEnabled({ + path: initialPath, + workspace: $workspaceStore ?? '', + requestBody: { enabled: e.detail } + }) + sendUserToast(`${e.detail ? 'enabled' : 'disabled'} NATS trigger ${initialPath}`) + }} + /> +
+ {/if} + {#if can_write} + + {/if} + {/if} +
+ {#if drawerLoading} + + {:else} + + {#if edit} + Changes can take up to 30 seconds to take effect. + {:else} + NATS consumers can take up to 30 seconds to start. + {/if} + +
+
+ +
+ + + +
+

+ Pick a script or flow to be triggered +

+
+ +
+
+
+ {/if} +
+
diff --git a/frontend/src/lib/components/triggers/NatsTriggersConfigSection.svelte b/frontend/src/lib/components/triggers/NatsTriggersConfigSection.svelte new file mode 100644 index 0000000000000..3c735c13be0c8 --- /dev/null +++ b/frontend/src/lib/components/triggers/NatsTriggersConfigSection.svelte @@ -0,0 +1,287 @@ + + +
+ {#if showCapture && captureInfo} + + {/if} +
+
+
+ + + {#if !staticInputDisabled} + { + if (ev.detail === 'static') { + delete args.nats_resource_path + args.require_tls = false + args.servers = [''] + args.auth = { + label: 'NO_AUTH' + } + } else { + delete args.servers + delete args.auth + delete args.require_tls + } + }} + > + + + + {/if} + + + {#if selected === 'resource'} + + {:else} + + {/if} + +
+ +
+ + + + {globalError} +
+
+
+
diff --git a/frontend/src/lib/components/triggers/NatsTriggersPanel.svelte b/frontend/src/lib/components/triggers/NatsTriggersPanel.svelte new file mode 100644 index 0000000000000..dc940f621dffb --- /dev/null +++ b/frontend/src/lib/components/triggers/NatsTriggersPanel.svelte @@ -0,0 +1,140 @@ + + + { + loadTriggers() + }} + bind:this={natsTriggerEditor} +/> + +{#if !$enterpriseLicense} + + Nats triggers are an enterprise only feature. + +{:else if isCloudHosted()} + + Nats triggers are disabled in the multi-tenant cloud. + +{:else} +
+ + NATS triggers execute scripts and flows in response to messages published to NATS subjects. + + { + saveTrigger(path, e.detail.config) + }} + on:applyArgs + on:addPreprocessor + cloudDisabled={false} + triggerType="nats" + {isFlow} + {data} + {path} + {isEditor} + {canHavePreprocessor} + {hasPreprocessor} + {newItem} + /> + + {#if !newItem} + {#if natsTriggers} +
+ {#if natsTriggers.length == 0} +
No nats triggers
+ {:else} +
+ {#each natsTriggers as natsTrigger (natsTrigger.path)} +
+
{natsTrigger.path}
+
+ {natsTrigger.nats_resource_path} +
+
+ +
+
+ {/each} +
+ {/if} +
+ {:else} + + {/if} + {/if} +
+{/if} diff --git a/frontend/src/lib/components/triggers/TriggersEditor.svelte b/frontend/src/lib/components/triggers/TriggersEditor.svelte index f82b1d0e4ac70..01cc6cb795293 100644 --- a/frontend/src/lib/components/triggers/TriggersEditor.svelte +++ b/frontend/src/lib/components/triggers/TriggersEditor.svelte @@ -13,6 +13,10 @@ import WebsocketTriggersPanel from './WebsocketTriggersPanel.svelte' import ScheduledPollPanel from './ScheduledPollPanel.svelte' import KafkaTriggersPanel from './KafkaTriggersPanel.svelte' + import NatsTriggersPanel from './NatsTriggersPanel.svelte' + import ToggleButtonGroup from '../common/toggleButton-v2/ToggleButtonGroup.svelte' + import ToggleButton from '../common/toggleButton-v2/ToggleButton.svelte' + import { KafkaIcon, NatsIcon } from '../icons' export let noEditor: boolean export let newItem = false @@ -24,6 +28,14 @@ export let canHavePreprocessor: boolean = false export let hasPreprocessor: boolean = false + let eventStreamType: 'kafka' | 'nats' = 'kafka' + + $: { + if ($selectedTrigger === 'kafka' || $selectedTrigger === 'nats') { + eventStreamType = $selectedTrigger + } + } + const { selectedTrigger, simplifiedPoll } = getContext('TriggerContext') const dispatch = createEventDispatcher() @@ -40,7 +52,9 @@ Schedules HTTP Websockets - Kafka + Event streams Email {#if isFlow} - {:else if $selectedTrigger === 'kafka'} -
- + {:else if $selectedTrigger === 'kafka' || $selectedTrigger === 'nats'} +
+ + + + + {#if eventStreamType === 'kafka'} + + {:else if eventStreamType === 'nats'} + + {/if}
{:else if $selectedTrigger === 'schedules'}
diff --git a/frontend/src/lib/components/triggers/TriggersEditorSection.svelte b/frontend/src/lib/components/triggers/TriggersEditorSection.svelte index 6f7820c40b224..9ef6bd6258afb 100644 --- a/frontend/src/lib/components/triggers/TriggersEditorSection.svelte +++ b/frontend/src/lib/components/triggers/TriggersEditorSection.svelte @@ -28,7 +28,8 @@ websocket: 'New websocket trigger', webhook: 'Webhook', kafka: 'New kafka trigger', - email: 'Email trigger' + email: 'Email trigger', + nats: 'NATS trigger' } const { captureOn } = getContext('TriggerContext') diff --git a/frontend/src/lib/components/triggers/TriggersWrapper.svelte b/frontend/src/lib/components/triggers/TriggersWrapper.svelte index 1883099ee6e32..f2df7a349f10f 100644 --- a/frontend/src/lib/components/triggers/TriggersWrapper.svelte +++ b/frontend/src/lib/components/triggers/TriggersWrapper.svelte @@ -7,6 +7,7 @@ import WebhooksConfigSection from './WebhooksConfigSection.svelte' import EmailTriggerConfigSection from '../details/EmailTriggerConfigSection.svelte' import KafkaTriggersConfigSection from './KafkaTriggersConfigSection.svelte' + import NatsTriggersConfigSection from './NatsTriggersConfigSection.svelte' export let triggerType: CaptureTriggerKind = 'webhook' export let cloudDisabled: boolean = false @@ -58,5 +59,7 @@ /> {:else if triggerType === 'kafka'} + {:else if triggerType === 'nats'} + {/if}
diff --git a/frontend/src/lib/script_helpers.ts b/frontend/src/lib/script_helpers.ts index 8aca36f83b0a8..c54f8540a50d1 100644 --- a/frontend/src/lib/script_helpers.ts +++ b/frontend/src/lib/script_helpers.ts @@ -564,7 +564,7 @@ export async function main(approver?: string) { export const BUN_PREPROCESSOR_MODULE_CODE = ` export async function preprocessor( wm_trigger: { - kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka', + kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats', http?: { route: string // The route path, e.g. "/users/:id" path: string // The actual path called, e.g. "/users/123" @@ -580,6 +580,14 @@ export async function preprocessor( brokers: string[] topic: string group_id: string + }, + nats?: { + servers: string[] + subject: string + headers?: Record + status?: number + description?: string + length: number } }, /* your other args */ @@ -593,7 +601,7 @@ export async function preprocessor( const DENO_PREPROCESSOR_MODULE_CODE = ` export async function preprocessor( wm_trigger: { - kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka', + kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats', http?: { route: string // The route path, e.g. "/users/:id" path: string // The actual path called, e.g. "/users/123" @@ -609,6 +617,14 @@ export async function preprocessor( brokers: string[] topic: string group_id: string + }, + nats?: { + servers: string[] + subject: string + headers?: Record + status?: number + description?: string + length: number } }, /* your other args */ @@ -664,11 +680,20 @@ class Kafka(TypedDict): brokers: list[str] group_id: str +class Nats(TypedDict): + servers: list[str] + subject: str + headers: dict[str, list[str]] | None + status: int | None + description: str | None + length: int + class WmTrigger(TypedDict): - kind: Literal["http", "email", "webhook", "websocket", "kafka"] + kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats"] http: Http | None websocket: Websocket | None kafka: Kafka | None + nats: Nats | None def preprocessor( wm_trigger: WmTrigger, diff --git a/frontend/src/routes/(root)/(logged)/+layout.svelte b/frontend/src/routes/(root)/(logged)/+layout.svelte index ce28d8e805f6d..2414122092be2 100644 --- a/frontend/src/routes/(root)/(logged)/+layout.svelte +++ b/frontend/src/routes/(root)/(logged)/+layout.svelte @@ -29,9 +29,7 @@ defaultScripts, hubBaseUrlStore, usedTriggerKinds, - devopsRole - } from '$lib/stores' import CenteredModal from '$lib/components/CenteredModal.svelte' import { afterNavigate, beforeNavigate } from '$app/navigation' @@ -193,11 +191,10 @@ async function loadUsedTriggerKinds() { let usedKinds: string[] = [] - const { http_routes_used, websocket_used, kafka_used } = await WorkspaceService.getUsedTriggers( - { + const { http_routes_used, websocket_used, kafka_used, nats_used } = + await WorkspaceService.getUsedTriggers({ workspace: $workspaceStore ?? '' - } - ) + }) if (http_routes_used) { usedKinds.push('http') } @@ -207,6 +204,9 @@ if (kafka_used) { usedKinds.push('kafka') } + if (nats_used) { + usedKinds.push('nats') + } $usedTriggerKinds = usedKinds } diff --git a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte index cfc4c6b96f7ba..fc7f4908f5811 100644 --- a/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/flows/get/[...path]/+page.svelte @@ -62,6 +62,7 @@ import TriggersBadge from '$lib/components/graph/renderers/triggers/TriggersBadge.svelte' import WebsocketTriggersPanel from '$lib/components/triggers/WebsocketTriggersPanel.svelte' import KafkaTriggersPanel from '$lib/components/triggers/KafkaTriggersPanel.svelte' + import NatsTriggersPanel from '$lib/components/triggers/NatsTriggersPanel.svelte' let flow: Flow | undefined let can_write = false @@ -547,6 +548,12 @@
+ +
+ +
+
+
No kafka triggers
{:else if items?.length}
- {#each items.slice(0, nbDisplayed) as { path, edited_by, edited_at, script_path, is_flow, kafka_resource_path, topics, extra_perms, canWrite, marked, error, last_server_ping, enabled } (path)} + {#each items.slice(0, nbDisplayed) as { path, edited_by, edited_at, script_path, is_flow, kafka_resource_path, topics, extra_perms, canWrite, marked, server_id, error, last_server_ping, enabled } (path)} {@const href = `${is_flow ? '/flows/get' : '/scripts/get'}/${script_path}`} {@const ping = last_server_ping ? new Date(last_server_ping) : undefined} + {@const pinging = ping && ping.getTime() > new Date().getTime() - 15 * 1000}
- {#if (enabled && (!ping || ping.getTime() < new Date().getTime() - 15 * 1000 || error)) || (!enabled && error)} + {#if (enabled && (!pinging || error)) || (!enabled && error) || (enabled && !server_id)}
{#if enabled} - Consumer is not connected{error ? ': ' + error : ''} + {#if !server_id} + Consumer is starting... + {:else} + Consumer is not connected{error ? ': ' + error : ''} + {/if} {:else} Consumer was disabled because of an error: {error} {/if} diff --git a/frontend/src/routes/(root)/(logged)/nats_triggers/+page.js b/frontend/src/routes/(root)/(logged)/nats_triggers/+page.js new file mode 100644 index 0000000000000..f2ce975f1fab3 --- /dev/null +++ b/frontend/src/routes/(root)/(logged)/nats_triggers/+page.js @@ -0,0 +1,5 @@ +export function load() { + return { + stuff: { title: 'NATS triggers' } + } +} diff --git a/frontend/src/routes/(root)/(logged)/nats_triggers/+page.svelte b/frontend/src/routes/(root)/(logged)/nats_triggers/+page.svelte new file mode 100644 index 0000000000000..806c772484571 --- /dev/null +++ b/frontend/src/routes/(root)/(logged)/nats_triggers/+page.svelte @@ -0,0 +1,428 @@ + + + + + (x.summary ?? '') + ' ' + x.path + ' (' + x.script_path + ')'} +/> + + + + + + + {#if isCloudHosted()} + + NATS triggers are disabled in the multi-tenant cloud. + +
+ {/if} +
+
+ +
+
Filter by path of
+ + + + +
+ + +
+ {#if $userStore?.is_super_admin && $userStore.username.includes('@')} + + {:else if $userStore?.is_admin || $userStore?.is_super_admin} + + {/if} +
+
+ {#if loading} + {#each new Array(6) as _} + + {/each} + {:else if !triggers?.length} +
No nats triggers
+ {:else if items?.length} +
+ {#each items.slice(0, nbDisplayed) as { path, edited_by, edited_at, script_path, is_flow, nats_resource_path, subjects, extra_perms, canWrite, marked, server_id, error, last_server_ping, enabled } (path)} + {@const href = `${is_flow ? '/flows/get' : '/scripts/get'}/${script_path}`} + {@const ping = last_server_ping ? new Date(last_server_ping) : undefined} + {@const pinging = ping && ping.getTime() > new Date().getTime() - 15 * 1000} + +
+
+ + + natsTriggerEditor?.openEdit(path, is_flow)} + class="min-w-0 grow hover:underline decoration-gray-400" + > +
+ {#if marked} + + {@html marked} + + {:else} + {nats_resource_path} - {subjects.join(', ')} + {/if} +
+
+ {path} +
+
+ runnable: {script_path} +
+
+ + + +
+ {#if (enabled && (!pinging || error)) || (!enabled && error) || (enabled && !server_id)} + + + + + +
+ {#if enabled} + {#if !server_id} + Consumer is starting... + {:else} + Consumer is not connected{error ? ': ' + error : ''} + {/if} + {:else} + Consumer was disabled because of an error: {error} + {/if} +
+
+ {:else if enabled} + + + + +
Consumer is connected
+
+ {/if} +
+ + { + setTriggerEnabled(path, e.detail) + }} + /> + +
+ + { + goto(href) + } + }, + { + displayName: 'Delete', + type: 'delete', + icon: Trash, + disabled: !canWrite, + action: async () => { + await NatsTriggerService.deleteNatsTrigger({ + workspace: $workspaceStore ?? '', + path + }) + loadTriggers() + } + }, + { + displayName: canWrite ? 'Edit' : 'View', + icon: canWrite ? Pen : Eye, + action: () => { + natsTriggerEditor?.openEdit(path, is_flow) + } + }, + { + displayName: 'Audit logs', + icon: Eye, + href: `${base}/audit_logs?resource=${path}` + }, + { + displayName: canWrite ? 'Share' : 'See Permissions', + icon: Share, + action: () => { + shareModal.openDrawer(path, 'nats_trigger') + } + } + ]} + /> +
+
+
+
edited by {edited_by}
the {displayDate(edited_at)}
+
+ {/each} +
+ {:else} + + {/if} +
+ {#if items && items?.length > 15 && nbDisplayed < items.length} + {nbDisplayed} items out of {items.length} + + {/if} + + + { + loadTriggers() + }} +/> diff --git a/frontend/src/routes/(root)/(logged)/scripts/get/[...hash]/+page.svelte b/frontend/src/routes/(root)/(logged)/scripts/get/[...hash]/+page.svelte index 43552e56a0fd0..113dda4b99cc5 100644 --- a/frontend/src/routes/(root)/(logged)/scripts/get/[...hash]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/scripts/get/[...hash]/+page.svelte @@ -85,6 +85,7 @@ import TriggersBadge from '$lib/components/graph/renderers/triggers/TriggersBadge.svelte' import WebsocketTriggersPanel from '$lib/components/triggers/WebsocketTriggersPanel.svelte' import KafkaTriggersPanel from '$lib/components/triggers/KafkaTriggersPanel.svelte' + import NatsTriggersPanel from '$lib/components/triggers/NatsTriggersPanel.svelte' let script: Script | undefined let topHash: string | undefined @@ -727,6 +728,11 @@
+ +
+ +
+
No websocket triggers
{:else if items?.length}
- {#each items.slice(0, nbDisplayed) as { path, edited_by, edited_at, script_path, url, is_flow, extra_perms, canWrite, marked, error, last_server_ping, enabled } (path)} + {#each items.slice(0, nbDisplayed) as { path, edited_by, edited_at, script_path, url, is_flow, extra_perms, canWrite, marked, error, last_server_ping, server_id, enabled } (path)} {@const href = `${is_flow ? '/flows/get' : '/scripts/get'}/${script_path}`} {@const ping = last_server_ping ? new Date(last_server_ping) : undefined} + {@const pinging = ping && ping.getTime() > new Date().getTime() - 15 * 1000}
- {#if (enabled && (!ping || ping.getTime() < new Date().getTime() - 15 * 1000 || error)) || (!enabled && error)} + {#if (enabled && (!pinging || error)) || (!enabled && error) || (enabled && !server_id)}
{#if enabled} - Websocket is not connected{error ? ': ' + error : ''} + {#if !server_id} + Websocket is starting... + {:else} + Websocket is not connected{error ? ': ' + error : ''} + {/if} {:else} Websocket was disabled because of an error: {error} {/if} @@ -322,7 +328,9 @@ -
Websocket is connected
+
+ Websocket is connected{!server_id ? ' (shutting down...)' : ''} +
{/if}