From 61da412c009104541fc833dc3d0a952fb75092c0 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 4 Aug 2023 15:12:19 +0200 Subject: [PATCH] Health checks --- samples/GrpcProjector/GrpcProjector.csproj | 8 +- samples/GrpcProjector/Program.cs | 16 +- samples/GrpcProjector/Protos/server.proto | 7 +- samples/GrpcProjector/appsettings.json | 2 +- .../google/api/annotations.proto | 31 ++ samples/GrpcProjector/google/api/http.proto | 376 ++++++++++++++++++ .../App/ConnectorApplication.cs | 35 +- .../App/ConnectorBuilder.cs | 51 +-- .../App/ConnectorRegistration.cs | 25 -- .../App/StartupJob.cs | 14 +- .../Config/Configuration.cs | 2 +- .../Config/ConnectorConfig.cs | 12 +- .../Config/Retries.cs | 6 +- .../Diag/ExporterMappings.cs | 6 +- src/Eventuous.Connector.Base/Diag/Logging.cs | 14 +- .../EsdbProjectorStartup.cs | 5 +- .../Config/ElasticConfig.cs | 10 +- .../Config/EsdbConfig.cs | 8 +- .../ConnectorStartup.cs | 16 +- .../Conversions/RawDataElasticSerializer.cs | 25 +- .../ElasticJsonProjector.cs | 21 +- ...Eventuous.Connector.EsdbGenericGrpc.csproj | 13 +- .../ProjectorStartup.cs | 3 +- .../Config/MongoConfig.cs | 6 +- .../MongoJsonProjector.cs | 8 +- .../MongoRegistrationExtensions.cs | 7 +- .../Config/SqlConfig.cs | 2 +- .../GetConnection.cs | 4 +- .../ProjectorStartup.cs | 3 +- .../SqlCheckpointStore.cs | 66 ++- .../SqlServerProjector.cs | 1 + .../Config/GrpcProjectorSettings.cs | 8 +- .../Extensions/TaskExtensions.cs | 2 +- .../GrpcContextKeys.cs | 2 +- .../GrpcFilter.cs | 24 +- .../GrpcProjectingProducer.cs | 6 +- .../GrpcResponseHandler.cs | 1 + .../GrpcTransform.cs | 1 - .../Projector.cs | 114 +++--- .../SubscriptionBuilderExtensions.cs | 8 +- src/Eventuous.Connector/Program.cs | 3 +- src/Eventuous.Connector/StartupBuilder.cs | 19 +- src/Eventuous.Connector/StartupEnvironment.cs | 9 +- 43 files changed, 730 insertions(+), 270 deletions(-) create mode 100644 samples/GrpcProjector/google/api/annotations.proto create mode 100644 samples/GrpcProjector/google/api/http.proto delete mode 100644 src/Eventuous.Connector.Base/App/ConnectorRegistration.cs diff --git a/samples/GrpcProjector/GrpcProjector.csproj b/samples/GrpcProjector/GrpcProjector.csproj index 163c185..9686cc7 100644 --- a/samples/GrpcProjector/GrpcProjector.csproj +++ b/samples/GrpcProjector/GrpcProjector.csproj @@ -13,16 +13,14 @@ - - + + + - - - diff --git a/samples/GrpcProjector/Program.cs b/samples/GrpcProjector/Program.cs index 0322bb5..a9350d4 100644 --- a/samples/GrpcProjector/Program.cs +++ b/samples/GrpcProjector/Program.cs @@ -1,5 +1,6 @@ using GrpcProjector.Services; using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.OpenApi.Models; using Serilog; using Serilog.Events; @@ -19,10 +20,23 @@ builder.Host.UseSerilog(); builder.Services.AddGrpc(); +builder.Services.AddGrpcSwagger(); +builder.Services.AddSwaggerGen(c => +{ + c.SwaggerDoc("v1", + new OpenApiInfo { Title = "gRPC transcoding", Version = "v1" }); +}); -if (builder.Environment.IsDevelopment()) { builder.WebHost.ConfigureKestrel(options => options.ListenLocalhost(9091, o => o.Protocols = HttpProtocols.Http2)); } +if (builder.Environment.IsDevelopment()) { + builder.WebHost.ConfigureKestrel(options => options.ListenLocalhost(9091, o => o.Protocols = HttpProtocols.Http1AndHttp2)); +} var app = builder.Build(); +app.UseSwagger(); +app.UseSwaggerUI(c => +{ + c.SwaggerEndpoint("/swagger/v1/swagger.json", "My API V1"); +}); app.MapGrpcService(); app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client"); diff --git a/samples/GrpcProjector/Protos/server.proto b/samples/GrpcProjector/Protos/server.proto index c33fd30..debf0b0 100644 --- a/samples/GrpcProjector/Protos/server.proto +++ b/samples/GrpcProjector/Protos/server.proto @@ -3,10 +3,15 @@ syntax = "proto3"; package grpc_projection; option csharp_namespace = "GrpcProjector"; import "google/protobuf/struct.proto"; +import "google/api/annotations.proto"; service Projection { rpc Project (ProjectRequest) returns (ProjectResponse); - rpc GetCheckpoint (GetCheckpointRequest) returns (GetCheckpointResponse); + rpc GetCheckpoint (GetCheckpointRequest) returns (GetCheckpointResponse) { + option (google.api.http) = { + get: "/v1/checkpoint/{checkpointId}" + }; + }; rpc StoreCheckpoint (StoreCheckpointRequest) returns (StoreCheckpointResponse); } diff --git a/samples/GrpcProjector/appsettings.json b/samples/GrpcProjector/appsettings.json index 405b4c2..dc29256 100644 --- a/samples/GrpcProjector/appsettings.json +++ b/samples/GrpcProjector/appsettings.json @@ -9,7 +9,7 @@ "AllowedHosts": "*", "Kestrel": { "EndpointDefaults": { - "Protocols": "Http2" + "Protocols": "Http1AndHttp2" } } } diff --git a/samples/GrpcProjector/google/api/annotations.proto b/samples/GrpcProjector/google/api/annotations.proto new file mode 100644 index 0000000..18dcf20 --- /dev/null +++ b/samples/GrpcProjector/google/api/annotations.proto @@ -0,0 +1,31 @@ +// Copyright (c) 2015, Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.api; + +import "google/api/http.proto"; +import "google/protobuf/descriptor.proto"; + +option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations"; +option java_multiple_files = true; +option java_outer_classname = "AnnotationsProto"; +option java_package = "com.google.api"; +option objc_class_prefix = "GAPI"; + +extend google.protobuf.MethodOptions { + // See `HttpRule`. + HttpRule http = 72295728; +} \ No newline at end of file diff --git a/samples/GrpcProjector/google/api/http.proto b/samples/GrpcProjector/google/api/http.proto new file mode 100644 index 0000000..a43cff7 --- /dev/null +++ b/samples/GrpcProjector/google/api/http.proto @@ -0,0 +1,376 @@ +// Copyright 2019 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package google.api; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations"; +option java_multiple_files = true; +option java_outer_classname = "HttpProto"; +option java_package = "com.google.api"; +option objc_class_prefix = "GAPI"; + +// Defines the HTTP configuration for an API service. It contains a list of +// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method +// to one or more HTTP REST API methods. +message Http { + // A list of HTTP configuration rules that apply to individual API methods. + // + // **NOTE:** All service configuration rules follow "last one wins" order. + repeated HttpRule rules = 1; + + // When set to true, URL path parameters will be fully URI-decoded except in + // cases of single segment matches in reserved expansion, where "%2F" will be + // left encoded. + // + // The default behavior is to not decode RFC 6570 reserved characters in multi + // segment matches. + bool fully_decode_reserved_expansion = 2; +} + +// # gRPC Transcoding +// +// gRPC Transcoding is a feature for mapping between a gRPC method and one or +// more HTTP REST endpoints. It allows developers to build a single API service +// that supports both gRPC APIs and REST APIs. Many systems, including [Google +// APIs](https://github.com/googleapis/googleapis), +// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC +// Gateway](https://github.com/grpc-ecosystem/grpc-gateway), +// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature +// and use it for large scale production services. +// +// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies +// how different portions of the gRPC request message are mapped to the URL +// path, URL query parameters, and HTTP request body. It also controls how the +// gRPC response message is mapped to the HTTP response body. `HttpRule` is +// typically specified as an `google.api.http` annotation on the gRPC method. +// +// Each mapping specifies a URL path template and an HTTP method. The path +// template may refer to one or more fields in the gRPC request message, as long +// as each field is a non-repeated field with a primitive (non-message) type. +// The path template controls how fields of the request message are mapped to +// the URL path. +// +// Example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get: "/v1/{name=messages/*}" +// }; +// } +// } +// message GetMessageRequest { +// string name = 1; // Mapped to URL path. +// } +// message Message { +// string text = 1; // The resource content. +// } +// +// This enables an HTTP REST to gRPC mapping as below: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456` | `GetMessage(name: "messages/123456")` +// +// Any fields in the request message which are not bound by the path template +// automatically become HTTP query parameters if there is no HTTP request body. +// For example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get:"/v1/messages/{message_id}" +// }; +// } +// } +// message GetMessageRequest { +// message SubMessage { +// string subfield = 1; +// } +// string message_id = 1; // Mapped to URL path. +// int64 revision = 2; // Mapped to URL query parameter `revision`. +// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`. +// } +// +// This enables a HTTP JSON to RPC mapping as below: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456?revision=2&sub.subfield=foo` | +// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield: +// "foo"))` +// +// Note that fields which are mapped to URL query parameters must have a +// primitive type or a repeated primitive type or a non-repeated message type. +// In the case of a repeated type, the parameter can be repeated in the URL +// as `...?param=A¶m=B`. In the case of a message type, each field of the +// message is mapped to a separate parameter, such as +// `...?foo.a=A&foo.b=B&foo.c=C`. +// +// For HTTP methods that allow a request body, the `body` field +// specifies the mapping. Consider a REST update method on the +// message resource collection: +// +// service Messaging { +// rpc UpdateMessage(UpdateMessageRequest) returns (Message) { +// option (google.api.http) = { +// patch: "/v1/messages/{message_id}" +// body: "message" +// }; +// } +// } +// message UpdateMessageRequest { +// string message_id = 1; // mapped to the URL +// Message message = 2; // mapped to the body +// } +// +// The following HTTP JSON to RPC mapping is enabled, where the +// representation of the JSON in the request body is determined by +// protos JSON encoding: +// +// HTTP | gRPC +// -----|----- +// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id: +// "123456" message { text: "Hi!" })` +// +// The special name `*` can be used in the body mapping to define that +// every field not bound by the path template should be mapped to the +// request body. This enables the following alternative definition of +// the update method: +// +// service Messaging { +// rpc UpdateMessage(Message) returns (Message) { +// option (google.api.http) = { +// patch: "/v1/messages/{message_id}" +// body: "*" +// }; +// } +// } +// message Message { +// string message_id = 1; +// string text = 2; +// } +// +// +// The following HTTP JSON to RPC mapping is enabled: +// +// HTTP | gRPC +// -----|----- +// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id: +// "123456" text: "Hi!")` +// +// Note that when using `*` in the body mapping, it is not possible to +// have HTTP parameters, as all fields not bound by the path end in +// the body. This makes this option more rarely used in practice when +// defining REST APIs. The common usage of `*` is in custom methods +// which don't use the URL at all for transferring data. +// +// It is possible to define multiple HTTP methods for one RPC by using +// the `additional_bindings` option. Example: +// +// service Messaging { +// rpc GetMessage(GetMessageRequest) returns (Message) { +// option (google.api.http) = { +// get: "/v1/messages/{message_id}" +// additional_bindings { +// get: "/v1/users/{user_id}/messages/{message_id}" +// } +// }; +// } +// } +// message GetMessageRequest { +// string message_id = 1; +// string user_id = 2; +// } +// +// This enables the following two alternative HTTP JSON to RPC mappings: +// +// HTTP | gRPC +// -----|----- +// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")` +// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id: +// "123456")` +// +// ## Rules for HTTP mapping +// +// 1. Leaf request fields (recursive expansion nested messages in the request +// message) are classified into three categories: +// - Fields referred by the path template. They are passed via the URL path. +// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They are passed via the HTTP +// request body. +// - All other fields are passed via the URL query parameters, and the +// parameter name is the field path in the request message. A repeated +// field can be represented as multiple query parameters under the same +// name. +// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL query parameter, all fields +// are passed via URL path and HTTP request body. +// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP request body, all +// fields are passed via URL path and URL query parameters. +// +// ### Path template syntax +// +// Template = "/" Segments [ Verb ] ; +// Segments = Segment { "/" Segment } ; +// Segment = "*" | "**" | LITERAL | Variable ; +// Variable = "{" FieldPath [ "=" Segments ] "}" ; +// FieldPath = IDENT { "." IDENT } ; +// Verb = ":" LITERAL ; +// +// The syntax `*` matches a single URL path segment. The syntax `**` matches +// zero or more URL path segments, which must be the last part of the URL path +// except the `Verb`. +// +// The syntax `Variable` matches part of the URL path as specified by its +// template. A variable template must not contain other variables. If a variable +// matches a single path segment, its template may be omitted, e.g. `{var}` +// is equivalent to `{var=*}`. +// +// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL` +// contains any reserved character, such characters should be percent-encoded +// before the matching. +// +// If a variable contains exactly one path segment, such as `"{var}"` or +// `"{var=*}"`, when such a variable is expanded into a URL path on the client +// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The +// server side does the reverse decoding. Such variables show up in the +// [Discovery +// Document](https://developers.google.com/discovery/v1/reference/apis) as +// `{var}`. +// +// If a variable contains multiple path segments, such as `"{var=foo/*}"` +// or `"{var=**}"`, when such a variable is expanded into a URL path on the +// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded. +// The server side does the reverse decoding, except "%2F" and "%2f" are left +// unchanged. Such variables show up in the +// [Discovery +// Document](https://developers.google.com/discovery/v1/reference/apis) as +// `{+var}`. +// +// ## Using gRPC API Service Configuration +// +// gRPC API Service Configuration (service config) is a configuration language +// for configuring a gRPC service to become a user-facing product. The +// service config is simply the YAML representation of the `google.api.Service` +// proto message. +// +// As an alternative to annotating your proto file, you can configure gRPC +// transcoding in your service config YAML files. You do this by specifying a +// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same +// effect as the proto annotation. This can be particularly useful if you +// have a proto that is reused in multiple services. Note that any transcoding +// specified in the service config will override any matching transcoding +// configuration in the proto. +// +// Example: +// +// http: +// rules: +// # Selects a gRPC method and applies HttpRule to it. +// - selector: example.v1.Messaging.GetMessage +// get: /v1/messages/{message_id}/{sub.subfield} +// +// ## Special notes +// +// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the +// proto to JSON conversion must follow the [proto3 +// specification](https://developers.google.com/protocol-buffers/docs/proto3#json). +// +// While the single segment variable follows the semantics of +// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String +// Expansion, the multi segment variable **does not** follow RFC 6570 Section +// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion +// does not expand special characters like `?` and `#`, which would lead +// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding +// for multi segment variables. +// +// The path variables **must not** refer to any repeated or mapped field, +// because client libraries are not capable of handling such variable expansion. +// +// The path variables **must not** capture the leading "/" character. The reason +// is that the most common use case "{var}" does not capture the leading "/" +// character. For consistency, all path variables must share the same behavior. +// +// Repeated message fields must not be mapped to URL query parameters, because +// no client library can support such complicated mapping. +// +// If an API needs to use a JSON array for request or response body, it can map +// the request or response body to a repeated field. However, some gRPC +// Transcoding implementations may not support this feature. +message HttpRule { + // Selects a method to which this rule applies. + // + // Refer to [selector][google.api.DocumentationRule.selector] for syntax details. + string selector = 1; + + // Determines the URL pattern is matched by this rules. This pattern can be + // used with any of the {get|put|post|delete|patch} methods. A custom method + // can be defined using the 'custom' field. + oneof pattern { + // Maps to HTTP GET. Used for listing and getting information about + // resources. + string get = 2; + + // Maps to HTTP PUT. Used for replacing a resource. + string put = 3; + + // Maps to HTTP POST. Used for creating a resource or performing an action. + string post = 4; + + // Maps to HTTP DELETE. Used for deleting a resource. + string delete = 5; + + // Maps to HTTP PATCH. Used for updating a resource. + string patch = 6; + + // The custom pattern is used for specifying an HTTP method that is not + // included in the `pattern` field, such as HEAD, or "*" to leave the + // HTTP method unspecified for this rule. The wild-card rule is useful + // for services that provide content to Web (HTML) clients. + CustomHttpPattern custom = 8; + } + + // The name of the request field whose value is mapped to the HTTP request + // body, or `*` for mapping all request fields not captured by the path + // pattern to the HTTP body, or omitted for not having any HTTP request body. + // + // NOTE: the referred field must be present at the top-level of the request + // message type. + string body = 7; + + // Optional. The name of the response field whose value is mapped to the HTTP + // response body. When omitted, the entire response message will be used + // as the HTTP response body. + // + // NOTE: The referred field must be present at the top-level of the response + // message type. + string response_body = 12; + + // Additional HTTP bindings for the selector. Nested bindings must + // not contain an `additional_bindings` field themselves (that is, + // the nesting may only be one level deep). + repeated HttpRule additional_bindings = 11; +} + +// A custom pattern is used for defining custom HTTP verb. +message CustomHttpPattern { + // The name of this custom HTTP verb. + string kind = 1; + + // The path matched by this custom verb. + string path = 2; +} \ No newline at end of file diff --git a/src/Eventuous.Connector.Base/App/ConnectorApplication.cs b/src/Eventuous.Connector.Base/App/ConnectorApplication.cs index 1a09783..78450c1 100644 --- a/src/Eventuous.Connector.Base/App/ConnectorApplication.cs +++ b/src/Eventuous.Connector.Base/App/ConnectorApplication.cs @@ -27,7 +27,11 @@ public class ConnectorApplicationBuilder config); public delegate ConnectorBuilder ConfigureConnector(ConnectorBuilder builder, ConnectorConfig config) + TProducer, TProduceOptions>( + ConnectorBuilder builder, + ConnectorConfig config, + IHealthChecksBuilder healthChecks + ) where TSubscription : EventSubscription where TSubscriptionOptions : SubscriptionOptions where TProducer : class, IEventProducer @@ -43,14 +47,17 @@ internal ConnectorApplicationBuilder(string configFile) { Config = Builder.Configuration.GetConnectorConfig(); Builder.Services.AddSingleton(Config.Source); Builder.Services.AddSingleton(Config.Target); + HealthChecks = Builder.Services.AddHealthChecks(); if (!Config.Connector.Diagnostics.Enabled) { Environment.SetEnvironmentVariable("EVENTUOUS_DISABLE_DIAGS", "1"); } + + var logLevel = Logging.ParseLogLevel(Config.Connector.LogLevel); + ConfigureSerilog(logLevel); } - [PublicAPI] - public WebApplicationBuilder Builder { get; } + public IHealthChecksBuilder HealthChecks { get; } + WebApplicationBuilder Builder { get; } - [PublicAPI] public ConnectorConfig Config { get; } [PublicAPI] @@ -64,10 +71,8 @@ public void ConfigureSerilog( _configureLogger = configureLogger; } - [PublicAPI] public void RegisterDependencies(ResolveDependencies configure) => configure(Builder.Services, Config); - [PublicAPI] public void RegisterConnector( ConfigureConnector configure ) @@ -75,15 +80,15 @@ ConfigureConnector where TProduceOptions : class { - var builder = configure(new ConnectorBuilder(), Config); - builder.Register(Builder.Services); + var builder = configure(new ConnectorBuilder(), Config, HealthChecks); + builder.Register(Builder.Services, HealthChecks); } const string ConnectorIdTag = "connectorId"; void EnrichActivity(Activity activity, string arg1, object arg2) => activity.AddTag(ConnectorIdTag, Config.Connector.ConnectorId); - bool _otelAdded; + bool _oTelAdded; [PublicAPI] public void AddOpenTelemetry( @@ -93,16 +98,16 @@ public void AddOpenTelemetry( ExporterMappings? tracingExporters = null, ExporterMappings? metricsExporters = null ) { - _otelAdded = true; + _oTelAdded = true; if (!Config.Connector.Diagnostics.Enabled) { return; } EventuousDiagnostics.AddDefaultTag(ConnectorIdTag, Config.Connector.ConnectorId); - var otelBuilder = Builder.Services.AddOpenTelemetry(); + var oTelBuilder = Builder.Services.AddOpenTelemetry(); if (Config.Connector.Diagnostics.Tracing is { Enabled: true }) { - otelBuilder.WithTracing( + oTelBuilder.WithTracing( cfg => { cfg.AddEventuousTracing(); @@ -117,7 +122,7 @@ public void AddOpenTelemetry( } if (Config.Connector.Diagnostics.Metrics is { Enabled: true }) { - otelBuilder.WithMetrics( + oTelBuilder.WithMetrics( cfg => { cfg.AddEventuous().AddEventuousSubscriptions(); configureMetrics?.Invoke(cfg); @@ -133,7 +138,7 @@ public ConnectorApp Build() { Builder.Services.AddHealthChecks().AddSubscriptionsHealthCheck("Subscriptions", HealthStatus.Unhealthy, new[] { Config.Connector.ConnectorId }); - if (!_otelAdded) { AddOpenTelemetry(); } + if (!_oTelAdded) { AddOpenTelemetry(); } var app = Builder.Build(); @@ -156,6 +161,8 @@ string configFile internal ConnectorApp(WebApplication host) => Host = host; public async Task Run() { + Host.MapHealthChecks("/health"); + try { await Host.RunConnector(); diff --git a/src/Eventuous.Connector.Base/App/ConnectorBuilder.cs b/src/Eventuous.Connector.Base/App/ConnectorBuilder.cs index e2a7e31..ad59dca 100644 --- a/src/Eventuous.Connector.Base/App/ConnectorBuilder.cs +++ b/src/Eventuous.Connector.Base/App/ConnectorBuilder.cs @@ -10,6 +10,7 @@ using Eventuous.Subscriptions.Polly; using Eventuous.Subscriptions.Registrations; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Diagnostics.HealthChecks; // ReSharper disable CheckNamespace @@ -19,12 +20,14 @@ public class ConnectorBuilder { [PublicAPI] [SuppressMessage("Performance", "CA1822:Mark members as static")] public ConnectorBuilder SubscribeWith(string subscriptionId) - where TSubscription : EventSubscription where TSubscriptionOptions : SubscriptionOptions + where TSubscription : EventSubscription + where TSubscriptionOptions : SubscriptionOptions => new(subscriptionId); } public class ConnectorBuilder : ConnectorBuilder - where TSub : EventSubscription where TSubOptions : SubscriptionOptions { + where TSub : EventSubscription + where TSubOptions : SubscriptionOptions { internal string SubscriptionId { get; } internal ConnectorBuilder(string subscriptionId) => SubscriptionId = subscriptionId; @@ -32,59 +35,54 @@ public class ConnectorBuilder : ConnectorBuilder [PublicAPI] public ConnectorBuilder ConfigureSubscriptionOptions(Action configureOptions) { _configureOptions = configureOptions; + return this; } [PublicAPI] public ConnectorBuilder ConfigureSubscription(Action> configure) { _configure = configure; + return this; } [PublicAPI] public ConnectorBuilder ProduceWith(ResolveRetryPolicy? retryPolicy = null, bool awaitProduce = true) - where TProducer : class, IEventProducer where TProduceOptions : class + where TProducer : class, IEventProducer + where TProduceOptions : class => new(this, retryPolicy, awaitProduce); internal void ConfigureOptions(TSubOptions options) => _configureOptions?.Invoke(options); internal void Configure(SubscriptionBuilder builder) => _configure?.Invoke(builder); - Action? _configureOptions; + Action? _configureOptions; Action>? _configure; } -public class ConnectorBuilder +public class ConnectorBuilder( + ConnectorBuilder inner, + ResolveRetryPolicy? resolveRetryPolicy, + bool awaitProduce +) where TSub : EventSubscription where TSubOptions : SubscriptionOptions where TProducer : class, IEventProducer where TProduceOptions : class { - readonly ConnectorBuilder _inner; - readonly ResolveRetryPolicy? _resolveRetryPolicy; Func>? _getTransformer; - readonly bool _awaitProduce; Type? _transformerType; - public ConnectorBuilder( - ConnectorBuilder inner, - ResolveRetryPolicy? resolveRetryPolicy, - bool awaitProduce - ) { - _inner = inner; - _resolveRetryPolicy = resolveRetryPolicy; - _awaitProduce = awaitProduce; - } - [PublicAPI] public ConnectorBuilder TransformWith(Func? getTransformer) where T : class, IGatewayTransform { - _getTransformer = getTransformer; + _getTransformer = getTransformer; _transformerType = typeof(T); + return this; } - public void Register(IServiceCollection services) { + public void Register(IServiceCollection services, IHealthChecksBuilder healthChecks) { services.AddSingleton( Ensure.NotNull(_transformerType, "Transformer"), Ensure.NotNull(_getTransformer, "GetTransformer") @@ -93,20 +91,23 @@ public void Register(IServiceCollection services) { services.TryAddSingleton(); services.AddSubscription( - _inner.SubscriptionId, + inner.SubscriptionId, builder => { - builder.Configure(_inner.ConfigureOptions); - _inner.Configure(builder); + builder.Configure(inner.ConfigureOptions); + inner.Configure(builder); builder.AddEventHandler(GetHandler); } ); + healthChecks.AddSubscriptionsHealthCheck(inner.SubscriptionId, HealthStatus.Degraded, new[] { inner.SubscriptionId }); + return; IEventHandler GetHandler(IServiceProvider sp) { var transform = sp.GetRequiredService(_transformerType!) as IGatewayTransform; var producer = sp.GetRequiredService(); - var handler = GatewayHandlerFactory.Create(producer, transform!.RouteAndTransform, _awaitProduce); - return _resolveRetryPolicy == null ? handler : new PollyEventHandler(handler, _resolveRetryPolicy(sp)); + var handler = GatewayHandlerFactory.Create(producer, transform!.RouteAndTransform, awaitProduce); + + return resolveRetryPolicy == null ? handler : new PollyEventHandler(handler, resolveRetryPolicy(sp)); } } } diff --git a/src/Eventuous.Connector.Base/App/ConnectorRegistration.cs b/src/Eventuous.Connector.Base/App/ConnectorRegistration.cs deleted file mode 100644 index d27ee30..0000000 --- a/src/Eventuous.Connector.Base/App/ConnectorRegistration.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved -// Licensed under the Apache License, Version 2.0. - -// ReSharper disable CheckNamespace - -using Eventuous.Producers; -using Eventuous.Subscriptions; - -namespace Microsoft.Extensions.DependencyInjection; - -public static class ConnectorRegistration { - public static IServiceCollection - AddConnector( - this IServiceCollection services, - Func> configure - ) - where TSubscription : EventSubscription - where TSubscriptionOptions : SubscriptionOptions - where TProducer : class, IEventProducer - where TProduceOptions : class { - var builder = configure(new ConnectorBuilder()); - builder.Register(services); - return services; - } -} diff --git a/src/Eventuous.Connector.Base/App/StartupJob.cs b/src/Eventuous.Connector.Base/App/StartupJob.cs index 133d2cb..1ccad66 100644 --- a/src/Eventuous.Connector.Base/App/StartupJob.cs +++ b/src/Eventuous.Connector.Base/App/StartupJob.cs @@ -9,18 +9,8 @@ public interface IStartupJob { Task Run(); } -public class StartupJob : IStartupJob { - readonly T1 _t1; - readonly T2 _t2; - readonly Func _func; - - public StartupJob(T1 t1, T2 t2, Func func) { - _t1 = t1; - _t2 = t2; - _func = func; - } - - public Task Run() => _func(_t1, _t2); +public class StartupJob(T1 t1, T2 t2, Func func) : IStartupJob { + public Task Run() => func(t1, t2); } public static class StartupJobRegistration { diff --git a/src/Eventuous.Connector.Base/Config/Configuration.cs b/src/Eventuous.Connector.Base/Config/Configuration.cs index acc5e8a..e25cf63 100644 --- a/src/Eventuous.Connector.Base/Config/Configuration.cs +++ b/src/Eventuous.Connector.Base/Config/Configuration.cs @@ -12,5 +12,5 @@ public static IConfigurationBuilder AddConfiguration(this WebApplicationBuilder public static ConnectorConfig GetConnectorConfig(this IConfiguration configuration) where TSource : class where TTarget : class where TFilter : class - => configuration.Get>(); + => configuration.Get>() ?? throw new ApplicationException("Configuration not found"); } diff --git a/src/Eventuous.Connector.Base/Config/ConnectorConfig.cs b/src/Eventuous.Connector.Base/Config/ConnectorConfig.cs index aef81cc..f4472ce 100644 --- a/src/Eventuous.Connector.Base/Config/ConnectorConfig.cs +++ b/src/Eventuous.Connector.Base/Config/ConnectorConfig.cs @@ -5,7 +5,10 @@ namespace Eventuous.Connector.Base.Config; -public record ConnectorConfig : ConnectorConfig where TSource : class where TTarget : class where TFilter : class { +public record ConnectorConfig : ConnectorConfig + where TSource : class + where TTarget : class + where TFilter : class { public TFilter? Filter { get; init; } public TSource Source { get; init; } = null!; public TTarget Target { get; init; } = null!; @@ -19,19 +22,20 @@ public record ConnectorSettings { public string ConnectorId { get; init; } = "default"; public string ConnectorAssembly { get; init; } = ""; public string ServiceName { get; init; } = "eventuous-connector"; + public string LogLevel { get; init; } = "info"; public DiagnosticsConfig Diagnostics { get; init; } = new(); } public record DiagnosticsConfig { public bool Enabled { get; init; } = true; - public TracingConfig? Tracing { get; init; } - public MetricsConfig? Metrics { get; init; } + public TracingConfig? Tracing { get; [UsedImplicitly] init; } + public MetricsConfig? Metrics { get; [UsedImplicitly] init; } public double TraceSamplerProbability { get; init; } = 0; } public record MetricsConfig { public bool Enabled { get; init; } = true; - public string[]? Exporters { get; init; } + public string[]? Exporters { get; [UsedImplicitly] init; } } public record TracingConfig { diff --git a/src/Eventuous.Connector.Base/Config/Retries.cs b/src/Eventuous.Connector.Base/Config/Retries.cs index 8156265..92a69d0 100644 --- a/src/Eventuous.Connector.Base/Config/Retries.cs +++ b/src/Eventuous.Connector.Base/Config/Retries.cs @@ -16,14 +16,14 @@ public static IAsyncPolicy RetryForever(IServiceProvider sp, ConnectorConfig var loggerFactory = sp.GetRequiredService(); var log = loggerFactory.CreateLogger(config.Connector.ConnectorId); - void LogRetry(Exception exception, int retry, TimeSpan delay, Context context) - => log.LogWarning("Retrying after {RetryAttempt} attempt(s), waiting for {Delay}", retry, delay); - return Policy .Handle() .WaitAndRetryForeverAsync( (retryAttempt, _) => TimeSpan.FromMilliseconds(retryAttempt * 100), LogRetry ); + + void LogRetry(Exception exception, int retry, TimeSpan delay, Context context) + => log.LogWarning("Retrying after {RetryAttempt} attempt(s), waiting for {Delay}", retry, delay); } } diff --git a/src/Eventuous.Connector.Base/Diag/ExporterMappings.cs b/src/Eventuous.Connector.Base/Diag/ExporterMappings.cs index 2efca9b..47837ff 100644 --- a/src/Eventuous.Connector.Base/Diag/ExporterMappings.cs +++ b/src/Eventuous.Connector.Base/Diag/ExporterMappings.cs @@ -16,17 +16,17 @@ public ExporterMappings Add(string name, Action configure) { public void RegisterExporters(T provider, string[]? exporters) { var name = typeof(T).Name; if (exporters == null) { - Log.Warning("No exporters for {name} available", name); + Log.Warning("No exporters for {Name} available", name); return; } foreach (var exporter in exporters) { if (_mappings.TryGetValue(exporter, out var addExporter)) { - Log.Information("Adding exporter {exporter} for {name}", exporter, name); + Log.Information("Adding exporter {Exporter} for {Name}", exporter, name); addExporter(provider); } else { - Log.Information("No exporters specified for {exporter}", exporter); + Log.Information("No exporters specified for {Exporter}", exporter); } } } diff --git a/src/Eventuous.Connector.Base/Diag/Logging.cs b/src/Eventuous.Connector.Base/Diag/Logging.cs index 0865182..936a6c4 100644 --- a/src/Eventuous.Connector.Base/Diag/Logging.cs +++ b/src/Eventuous.Connector.Base/Diag/Logging.cs @@ -21,11 +21,10 @@ public static Logger GetLogger( var sc = sinkConfiguration ?? DefaultSink; var logLevel = minimumLogLevel - ?? (environment.IsDevelopment() ? LogEventLevel.Debug : LogEventLevel.Information); + ?? (environment.IsDevelopment() ? LogEventLevel.Debug : LogEventLevel.Information); var logConfig = new LoggerConfiguration() .MinimumLevel.Is(logLevel) - // .MinimumLevel.Override("Eventuous", LogEventLevel.Verbose) .MinimumLevel.Override("Microsoft", LogEventLevel.Information) .MinimumLevel.Override("Grpc", LogEventLevel.Fatal) .MinimumLevel.Override("Microsoft.AspNetCore.Mvc.Infrastructure", LogEventLevel.Warning) @@ -53,4 +52,15 @@ public static void ConfigureSerilog( Log.Logger = GetLogger(builder.Environment, minimumLogLevel, sinkConfiguration, configure); builder.Host.UseSerilog(); } + + public static LogEventLevel? ParseLogLevel(string logLevel) + => logLevel.ToUpper() switch { + "TRACE" => LogEventLevel.Verbose, + "DEBUG" => LogEventLevel.Debug, + "INFO" => LogEventLevel.Information, + "WARN" => LogEventLevel.Warning, + "ERROR" => LogEventLevel.Error, + "FATAL" => LogEventLevel.Fatal, + _ => null + }; } diff --git a/src/Eventuous.Connector.EsdbBase/EsdbProjectorStartup.cs b/src/Eventuous.Connector.EsdbBase/EsdbProjectorStartup.cs index 79febbe..f3796e2 100644 --- a/src/Eventuous.Connector.EsdbBase/EsdbProjectorStartup.cs +++ b/src/Eventuous.Connector.EsdbBase/EsdbProjectorStartup.cs @@ -55,7 +55,8 @@ ExporterMappings metricsExporters ConnectorBuilder ConfigureProjectConnector( ConnectorBuilder builder, - ConnectorConfig config + ConnectorConfig config, + IHealthChecksBuilder healthChecks ) { var serializer = new RawDataDeserializer(); var concurrencyLimit = config.Source.ConcurrencyLimit; @@ -67,7 +68,7 @@ ConnectorConfig config b => { ConfigureSubscription(b); b.WithPartitioningByStream(concurrencyLimit); - b.AddGrpcProjector(NotNull(config.Filter)); + b.AddGrpcProjector(NotNull(config.Filter), healthChecks); } ) .ProduceWith(sp => GetRetryPolicy(sp, config)) diff --git a/src/Eventuous.Connector.EsdbElastic/Config/ElasticConfig.cs b/src/Eventuous.Connector.EsdbElastic/Config/ElasticConfig.cs index 0189be2..9f60bad 100644 --- a/src/Eventuous.Connector.EsdbElastic/Config/ElasticConfig.cs +++ b/src/Eventuous.Connector.EsdbElastic/Config/ElasticConfig.cs @@ -6,9 +6,9 @@ namespace Eventuous.Connector.EsdbElastic.Config; public record ElasticConfig { - public string? ConnectionString { get; init; } - public string? CloudId { get; init; } - public string? ApiKey { get; init; } - public IndexConfig? DataStream { get; init; } - public string ConnectorMode { get; init; } = "produce"; + public string? ConnectionString { get; [UsedImplicitly] init; } + public string? CloudId { get; [UsedImplicitly] init; } + public string? ApiKey { get; [UsedImplicitly] init; } + public IndexConfig? DataStream { get; [UsedImplicitly] init; } + public string ConnectorMode { get; [UsedImplicitly] init; } = "produce"; } diff --git a/src/Eventuous.Connector.EsdbElastic/Config/EsdbConfig.cs b/src/Eventuous.Connector.EsdbElastic/Config/EsdbConfig.cs index 8eb6612..f064cf2 100644 --- a/src/Eventuous.Connector.EsdbElastic/Config/EsdbConfig.cs +++ b/src/Eventuous.Connector.EsdbElastic/Config/EsdbConfig.cs @@ -1,7 +1,9 @@ -// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global +// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved +// Licensed under the Apache License, Version 2.0. + namespace Eventuous.Connector.EsdbElastic.Config; public record EsdbConfig { - public string ConnectionString { get; init; } = null!; - public int ConcurrencyLimit { get; init; } = 1; + public string ConnectionString { get; [UsedImplicitly] init; } = null!; + public int ConcurrencyLimit { get; [UsedImplicitly] init; } = 1; } diff --git a/src/Eventuous.Connector.EsdbElastic/ConnectorStartup.cs b/src/Eventuous.Connector.EsdbElastic/ConnectorStartup.cs index 8cbf6ca..506600d 100644 --- a/src/Eventuous.Connector.EsdbElastic/ConnectorStartup.cs +++ b/src/Eventuous.Connector.EsdbElastic/ConnectorStartup.cs @@ -39,18 +39,18 @@ ExporterMappings metricsExporters var builder = ConnectorApp.Create(configFile); if (builder.Config.Target.ConnectorMode == "produce") { - builder .RegisterDependencies(RegisterProduce); - builder.RegisterConnector(ConfigureProduceConnector); + builder.RegisterDependencies(RegisterProduce); + builder.RegisterConnector(ConfigureProduceConnector); } else { builder.RegisterDependencies(RegisterProject); - builder.RegisterConnector(ConfigureProjectConnector); + builder.RegisterConnector(ConfigureProjectConnector); } builder.AddOpenTelemetry( (cfg, enrich) => cfg - .AddGrpcClientInstrumentation(options => options.Enrich = enrich) + .AddGrpcClientInstrumentation(options => options.Enrich = enrich) .AddElasticsearchClientInstrumentation(options => options.Enrich = enrich), sampler: new AlwaysOnSampler(), tracingExporters: tracingExporters, @@ -86,7 +86,8 @@ static void RegisterDependencies( static ConnectorBuilder ConfigureProduceConnector( ConnectorBuilder cfg, - ConnectorConfig config + ConnectorConfig config, + IHealthChecksBuilder healthChecks ) { var indexName = NotEmptyString(config.Target.DataStream?.IndexName); var getTransform = (IServiceProvider _) => new DefaultElasticTransform(indexName); @@ -99,7 +100,8 @@ ConnectorConfig config static ConnectorBuilder ConfigureProjectConnector( ConnectorBuilder cfg, - ConnectorConfig config + ConnectorConfig config, + IHealthChecksBuilder healthChecks ) { var indexName = NotEmptyString(config.Target.DataStream?.IndexName); @@ -109,7 +111,7 @@ ConnectorConfig config sp.GetRequiredService>>() ); - var builder = AddSubscription(cfg, config, b => b.AddGrpcProjector(NotNull(config.Filter))); + var builder = AddSubscription(cfg, config, b => b.AddGrpcProjector(NotNull(config.Filter), healthChecks)); return builder .ProduceWith() diff --git a/src/Eventuous.Connector.EsdbElastic/Conversions/RawDataElasticSerializer.cs b/src/Eventuous.Connector.EsdbElastic/Conversions/RawDataElasticSerializer.cs index f089825..ceb84c7 100644 --- a/src/Eventuous.Connector.EsdbElastic/Conversions/RawDataElasticSerializer.cs +++ b/src/Eventuous.Connector.EsdbElastic/Conversions/RawDataElasticSerializer.cs @@ -5,27 +5,25 @@ namespace Eventuous.Connector.EsdbElastic.Conversions; -public class RawDataElasticSerializer : IElasticsearchSerializer { - readonly IElasticsearchSerializer _builtIn; +public class RawDataElasticSerializer(IElasticsearchSerializer builtIn) : IElasticsearchSerializer { + public object Deserialize(Type type, Stream stream) => builtIn.Deserialize(type, stream); - public RawDataElasticSerializer(IElasticsearchSerializer builtIn) => _builtIn = builtIn; - public object Deserialize(Type type, Stream stream) => _builtIn.Deserialize(type, stream); + public T Deserialize(Stream stream) => builtIn.Deserialize(stream); - public T Deserialize(Stream stream) => _builtIn.Deserialize(stream); - - public Task DeserializeAsync( - Type type, Stream stream, CancellationToken cancellationToken = default - ) - => _builtIn.DeserializeAsync(type, stream, cancellationToken); + public Task DeserializeAsync(Type type, Stream stream, CancellationToken cancellationToken = default) + => builtIn.DeserializeAsync(type, stream, cancellationToken); public Task DeserializeAsync(Stream stream, CancellationToken cancellationToken = default) - => _builtIn.DeserializeAsync(stream, cancellationToken); + => builtIn.DeserializeAsync(stream, cancellationToken); public void Serialize( - T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.None + T data, + Stream stream, + SerializationFormatting formatting = SerializationFormatting.None ) { if (data is not string dataString) { - _builtIn.Serialize(data, stream, formatting); + builtIn.Serialize(data, stream, formatting); + return; } @@ -41,6 +39,7 @@ public Task SerializeAsync( CancellationToken cancellationToken = default ) { Serialize(data, stream, formatting); + return Task.CompletedTask; } } diff --git a/src/Eventuous.Connector.EsdbElastic/ElasticJsonProjector.cs b/src/Eventuous.Connector.EsdbElastic/ElasticJsonProjector.cs index 93b8914..5bf35ea 100644 --- a/src/Eventuous.Connector.EsdbElastic/ElasticJsonProjector.cs +++ b/src/Eventuous.Connector.EsdbElastic/ElasticJsonProjector.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using Eventuous.Connector.Base.Grpc; +using Eventuous.Connector.Filters.Grpc; using Eventuous.Producers.Diagnostics; using Microsoft.Extensions.Logging; using Nest; @@ -22,6 +23,8 @@ public ElasticJsonProjector(IElasticClient elasticClient, ILogger((message, token) => Execute(message, token, IndexOne)); On((message, token) => Execute(message, token, UpdateOne)); + return; + async Task Execute( ProjectedMessage msg, CancellationToken cancellationToken, @@ -41,12 +44,9 @@ Func> execute ProduceOperation = "project" }; - async Task IndexOne( - Index operation, - IndexName indexName, - CancellationToken cancellationToken - ) { - _log.LogTrace("Indexing document with id {id} to {index}", operation.Id, indexName); + async Task IndexOne(Index operation, IndexName indexName, CancellationToken cancellationToken) { + _log.LogTrace("Indexing document with id {Id} to {Index}", operation.Id, indexName); + var response = await _elasticClient.IndexAsync( new IndexRequest(operation.Document.ToString(), indexName, operation.Id), cancellationToken @@ -55,12 +55,9 @@ CancellationToken cancellationToken return new ElasticCallResponse(response.IsValid, response.DebugInformation, response.OriginalException); } - async Task UpdateOne( - Update operation, - IndexName indexName, - CancellationToken cancellationToken - ) { - _log.LogTrace("Updating document with id {id} to {index}", operation.Id, indexName); + async Task UpdateOne(Update operation, IndexName indexName, CancellationToken cancellationToken) { + _log.LogTrace("Updating document with id {Id} to {Index}", operation.Id, indexName); + var response = await _elasticClient.UpdateAsync( new UpdateRequest(indexName, operation.Id) { Doc = operation.Document.ToString() diff --git a/src/Eventuous.Connector.EsdbGenericGrpc/Eventuous.Connector.EsdbGenericGrpc.csproj b/src/Eventuous.Connector.EsdbGenericGrpc/Eventuous.Connector.EsdbGenericGrpc.csproj index 52e90aa..e421f93 100644 --- a/src/Eventuous.Connector.EsdbGenericGrpc/Eventuous.Connector.EsdbGenericGrpc.csproj +++ b/src/Eventuous.Connector.EsdbGenericGrpc/Eventuous.Connector.EsdbGenericGrpc.csproj @@ -13,12 +13,19 @@ - - - Tools\Ensure.cs + + + Client + Public + True + True + obj/Debug/net7.0/ + MSBuild:Compile + + diff --git a/src/Eventuous.Connector.EsdbGenericGrpc/ProjectorStartup.cs b/src/Eventuous.Connector.EsdbGenericGrpc/ProjectorStartup.cs index 49d6c01..058b401 100644 --- a/src/Eventuous.Connector.EsdbGenericGrpc/ProjectorStartup.cs +++ b/src/Eventuous.Connector.EsdbGenericGrpc/ProjectorStartup.cs @@ -44,7 +44,8 @@ static void RegisterProject(IServiceCollection services, ConnectorConfig ConfigureProjectConnector( ConnectorBuilder cfg, - ConnectorConfig config + ConnectorConfig config, + IHealthChecksBuilder healthChecks ) { var serializer = new PassThroughSerializer(); var concurrencyLimit = config.Source.ConcurrencyLimit; diff --git a/src/Eventuous.Connector.EsdbMongo/Config/MongoConfig.cs b/src/Eventuous.Connector.EsdbMongo/Config/MongoConfig.cs index 040559b..32c225a 100644 --- a/src/Eventuous.Connector.EsdbMongo/Config/MongoConfig.cs +++ b/src/Eventuous.Connector.EsdbMongo/Config/MongoConfig.cs @@ -4,8 +4,8 @@ namespace Eventuous.Connector.EsdbMongo.Config; public record MongoConfig { - public string? ConnectionString { get; init; } - public string? Database { get; init; } - public string? Collection { get; init; } + public string? ConnectionString { get; [UsedImplicitly] init; } + public string? Database { get; [UsedImplicitly] init; } + public string? Collection { get; [UsedImplicitly] init; } public string ConnectorMode { get; init; } = "project"; } diff --git a/src/Eventuous.Connector.EsdbMongo/MongoJsonProjector.cs b/src/Eventuous.Connector.EsdbMongo/MongoJsonProjector.cs index 2d8867a..66164ee 100644 --- a/src/Eventuous.Connector.EsdbMongo/MongoJsonProjector.cs +++ b/src/Eventuous.Connector.EsdbMongo/MongoJsonProjector.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using Eventuous.Connector.Base.Grpc; +using Eventuous.Connector.Filters.Grpc; using Eventuous.Producers.Diagnostics; using Microsoft.Extensions.Logging; using MongoDB.Bson; @@ -13,7 +14,8 @@ public class MongoJsonProjector : GrpcProjectingProducer _log; - public MongoJsonProjector(IMongoDatabase database, ILogger log) : base(TracingOptions) { + public MongoJsonProjector(IMongoDatabase database, ILogger log) + : base(TracingOptions) { _database = database; _log = log; @@ -38,7 +40,7 @@ Task InsertOne(string collection, InsertOne insertOne, CancellationToken cancell .GetCollection(collection) .InsertOneAsync(BsonDocument.Parse(insertOne.Document.ToString()), cancellationToken: cancellationToken); } - + Task InsertMany(string collection, InsertMany insertMany, CancellationToken cancellationToken) { _log.LogTrace("Inserting {@Documents}", insertMany); @@ -61,7 +63,7 @@ Task UpdateOne(string collection, UpdateOne updateOne, CancellationToken cancell Task UpdateMany(string collection, UpdateMany updateMany, CancellationToken cancellationToken) { _log.LogTrace("Updating with {@Update}", updateMany); - + return _database.GetCollection(collection) .UpdateManyAsync( new JsonFilterDefinition(updateMany.Filter.ToString()), diff --git a/src/Eventuous.Connector.EsdbMongo/MongoRegistrationExtensions.cs b/src/Eventuous.Connector.EsdbMongo/MongoRegistrationExtensions.cs index dd8fecf..837620a 100644 --- a/src/Eventuous.Connector.EsdbMongo/MongoRegistrationExtensions.cs +++ b/src/Eventuous.Connector.EsdbMongo/MongoRegistrationExtensions.cs @@ -8,16 +8,13 @@ namespace Eventuous.Connector.EsdbMongo; public static class MongoRegistrationExtensions { - public static IServiceCollection AddMongo( - this IServiceCollection services, - string connectionString, - string database - ) + public static IServiceCollection AddMongo(this IServiceCollection services, string connectionString, string database) => services.AddSingleton(ConfigureMongo(connectionString, database)); public static IMongoDatabase ConfigureMongo(string connectionString, string database) { var settings = MongoClientSettings.FromConnectionString(connectionString); settings.ClusterConfigurator = cb => cb.Subscribe(new DiagnosticsActivityEventSubscriber()); + return new MongoClient(settings).GetDatabase(database); } } diff --git a/src/Eventuous.Connector.EsdbSqlServer/Config/SqlConfig.cs b/src/Eventuous.Connector.EsdbSqlServer/Config/SqlConfig.cs index 3fe8f57..e569f1b 100644 --- a/src/Eventuous.Connector.EsdbSqlServer/Config/SqlConfig.cs +++ b/src/Eventuous.Connector.EsdbSqlServer/Config/SqlConfig.cs @@ -4,5 +4,5 @@ namespace Eventuous.Connector.EsdbSqlServer.Config; public record SqlConfig { - public string ConnectionString { get; init; } = null!; + public string ConnectionString { get; [UsedImplicitly] init; } = null!; } diff --git a/src/Eventuous.Connector.EsdbSqlServer/GetConnection.cs b/src/Eventuous.Connector.EsdbSqlServer/GetConnection.cs index fc8d55d..b05a834 100644 --- a/src/Eventuous.Connector.EsdbSqlServer/GetConnection.cs +++ b/src/Eventuous.Connector.EsdbSqlServer/GetConnection.cs @@ -10,12 +10,12 @@ namespace Eventuous.Connector.EsdbSqlServer; public static class ConnectionFactory { public static GetConnection GetConnectionFactory(string connectionString) { + return GetConnection; + async Task GetConnection(CancellationToken cancellationToken) { var connection = new SqlConnection(connectionString); await connection.OpenAsync(cancellationToken); return connection; } - - return GetConnection; } } diff --git a/src/Eventuous.Connector.EsdbSqlServer/ProjectorStartup.cs b/src/Eventuous.Connector.EsdbSqlServer/ProjectorStartup.cs index b4ab1b5..b306ac5 100644 --- a/src/Eventuous.Connector.EsdbSqlServer/ProjectorStartup.cs +++ b/src/Eventuous.Connector.EsdbSqlServer/ProjectorStartup.cs @@ -28,7 +28,8 @@ protected override IGatewayTransform GetTransform(IServ serviceProvider.GetRequiredService>>() ); - protected override IAsyncPolicy GetRetryPolicy(IServiceProvider serviceProvider, ConnectorConfig config) => RetryPolicies.RetryForever(serviceProvider, config); + protected override IAsyncPolicy GetRetryPolicy(IServiceProvider serviceProvider, ConnectorConfig config) + => RetryPolicies.RetryForever(serviceProvider, config); protected override void ConfigureSubscription(SubscriptionBuilder builder) => builder.UseCheckpointStore(); diff --git a/src/Eventuous.Connector.EsdbSqlServer/SqlCheckpointStore.cs b/src/Eventuous.Connector.EsdbSqlServer/SqlCheckpointStore.cs index 0567632..4a2574c 100644 --- a/src/Eventuous.Connector.EsdbSqlServer/SqlCheckpointStore.cs +++ b/src/Eventuous.Connector.EsdbSqlServer/SqlCheckpointStore.cs @@ -8,43 +8,24 @@ namespace Eventuous.Connector.EsdbSqlServer; -public class SqlCheckpointStore : ICheckpointStore { +public class SqlCheckpointStore(GetConnection getConnection, ILogger log) : ICheckpointStore { const string TableName = "Checkpoints"; - const string CreateTableSql = @$"CREATE TABLE {TableName} (ID VARCHAR(100) NOT NULL PRIMARY KEY, Position BIGINT)"; - const string InsertCheckpointSql = @$"INSERT INTO {TableName} (ID, Position) VALUES (@CheckpointId, @Position)"; - const string UpdateCheckpointSql = @$"UPDATE {TableName} SET Position = @Position WHERE ID = @CheckpointId"; - readonly GetConnection _getConnection; - readonly ILogger _log; - - public SqlCheckpointStore(GetConnection getConnection, ILogger log) { - _getConnection = getConnection; - _log = log; - } + const string CreateTableSql = $"CREATE TABLE {TableName} (ID VARCHAR(100) NOT NULL PRIMARY KEY, Position BIGINT)"; + const string InsertCheckpointSql = $"INSERT INTO {TableName} (ID, Position) VALUES (@CheckpointId, @Position)"; + const string UpdateCheckpointSql = $"UPDATE {TableName} SET Position = @Position WHERE ID = @CheckpointId"; public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { await EnsureTableExists(cancellationToken); - async Task GetCheckpoint(DbCommand cmd, CancellationToken ct) { - var cp = await cmd.ExecuteReaderAsync(ct); - - if (!await cp.ReadAsync(ct)) { - await AddCheckpoint(); - return new Checkpoint(checkpointId, null); - } - - var value = cp.GetInt64(0); - return new Checkpoint(checkpointId, value == -1 ? null : (ulong?)value); - } - - var checkpoint = await _getConnection.ExecuteQuery( + var checkpoint = await getConnection.ExecuteQuery( $"SELECT Position FROM {TableName} WHERE ID = @CheckpointId", cmd => cmd.AddParameter("@CheckpointId", checkpointId), GetCheckpoint, cancellationToken ); - _log.LogInformation( + log.LogInformation( "Loaded checkpoint {CheckpointId} with value {Position}", checkpointId, checkpoint.Position @@ -52,8 +33,22 @@ async Task GetCheckpoint(DbCommand cmd, CancellationToken ct) { return checkpoint; + async Task GetCheckpoint(DbCommand cmd, CancellationToken ct) { + var cp = await cmd.ExecuteReaderAsync(ct); + + if (!await cp.ReadAsync(ct)) { + await AddCheckpoint(); + + return new Checkpoint(checkpointId, null); + } + + var value = cp.GetInt64(0); + + return new Checkpoint(checkpointId, value == -1 ? null : (ulong?)value); + } + Task AddCheckpoint() - => _getConnection.ExecuteNonQuery( + => getConnection.ExecuteNonQuery( InsertCheckpointSql, cmd => { cmd.AddParameter("@CheckpointId", checkpointId); @@ -68,7 +63,7 @@ public async ValueTask StoreCheckpoint( bool force, CancellationToken cancellationToken ) { - await _getConnection.ExecuteNonQuery( + await getConnection.ExecuteNonQuery( UpdateCheckpointSql, cmd => { cmd.AddParameter("@CheckpointId", checkpoint.Id); @@ -76,8 +71,8 @@ await _getConnection.ExecuteNonQuery( }, cancellationToken ); - - _log.LogDebug("Stored checkpoint {CheckpointId} with value {Position}", checkpoint.Id, checkpoint.Position); + + log.LogDebug("Stored checkpoint {CheckpointId} with value {Position}", checkpoint.Id, checkpoint.Position); return checkpoint; } @@ -85,13 +80,13 @@ await _getConnection.ExecuteNonQuery( async Task EnsureTableExists(CancellationToken cancellationToken) { if (await Exists()) return; - _log.LogInformation("Creating the checkpoints table"); + log.LogInformation("Creating the checkpoints table"); await CreateTable(); return; async Task Exists() { - await using var connection = await _getConnection(cancellationToken); + await using var connection = await getConnection(cancellationToken); await using var command = connection.CreateCommand(); command.CommandType = CommandType.Text; command.CommandText = "IF EXISTS(SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME=@table) SELECT 1 ELSE SELECT 0"; @@ -101,7 +96,7 @@ async Task Exists() { return (exists == 1); } - Task CreateTable() => _getConnection.ExecuteNonQuery(CreateTableSql, _ => { }, cancellationToken); + Task CreateTable() => getConnection.ExecuteNonQuery(CreateTableSql, _ => { }, cancellationToken); } } @@ -115,9 +110,9 @@ public static void AddParameter(this IDbCommand command, string name, object? va public static async Task ExecuteNonQuery( this GetConnection getConnection, - string query, - Action configureCommand, - CancellationToken cancellationToken + string query, + Action configureCommand, + CancellationToken cancellationToken ) { await using var connection = await getConnection(cancellationToken); await using var command = connection.CreateCommand(); @@ -139,6 +134,7 @@ CancellationToken cancellationToken command.CommandType = CommandType.Text; command.CommandText = query; configureCommand(command); + return await action(command, cancellationToken); } } diff --git a/src/Eventuous.Connector.EsdbSqlServer/SqlServerProjector.cs b/src/Eventuous.Connector.EsdbSqlServer/SqlServerProjector.cs index 1956a7b..31cdec9 100644 --- a/src/Eventuous.Connector.EsdbSqlServer/SqlServerProjector.cs +++ b/src/Eventuous.Connector.EsdbSqlServer/SqlServerProjector.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using Eventuous.Connector.Base.Grpc; +using Eventuous.Connector.Filters.Grpc; using Eventuous.Producers.Diagnostics; using Microsoft.Extensions.Logging; diff --git a/src/Eventuous.Connector.Filters.Grpc/Config/GrpcProjectorSettings.cs b/src/Eventuous.Connector.Filters.Grpc/Config/GrpcProjectorSettings.cs index e4b14db..ddfe111 100644 --- a/src/Eventuous.Connector.Filters.Grpc/Config/GrpcProjectorSettings.cs +++ b/src/Eventuous.Connector.Filters.Grpc/Config/GrpcProjectorSettings.cs @@ -8,11 +8,13 @@ namespace Eventuous.Connector.Filters.Grpc.Config; public record GrpcProjectorSettings { // TODO: Add credentials - public string Uri { get; init; } = "http://localhost:9200"; + [PublicAPI] + public string Uri { get; init; } = "http://localhost:9200"; + + [PublicAPI] public string Credentials { get; init; } = "ssl"; - public string GetHost() - => NotEmptyString(Uri, "gRPC projector URI"); + public string GetHost() => NotEmptyString(Uri, "gRPC projector URI"); public ChannelCredentials GetCredentials() { var setting = NotEmptyString(Credentials, "gRPC projector credentials"); diff --git a/src/Eventuous.Connector.Filters.Grpc/Extensions/TaskExtensions.cs b/src/Eventuous.Connector.Filters.Grpc/Extensions/TaskExtensions.cs index 1d2d17d..d742226 100644 --- a/src/Eventuous.Connector.Filters.Grpc/Extensions/TaskExtensions.cs +++ b/src/Eventuous.Connector.Filters.Grpc/Extensions/TaskExtensions.cs @@ -1,7 +1,7 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -namespace Eventuous.Connector.Base.Tools; +namespace Eventuous.Connector.Filters.Grpc.Extensions; static class TaskExtensions { public static async Task WhenAll(this IEnumerable tasks) { diff --git a/src/Eventuous.Connector.Filters.Grpc/GrpcContextKeys.cs b/src/Eventuous.Connector.Filters.Grpc/GrpcContextKeys.cs index 2c05049..3b78eac 100644 --- a/src/Eventuous.Connector.Filters.Grpc/GrpcContextKeys.cs +++ b/src/Eventuous.Connector.Filters.Grpc/GrpcContextKeys.cs @@ -1,7 +1,7 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -namespace Eventuous.Connector.Base.Grpc; +namespace Eventuous.Connector.Filters.Grpc; public static class GrpcContextKeys { public const string ProjectionResult = "projectionResult"; diff --git a/src/Eventuous.Connector.Filters.Grpc/GrpcFilter.cs b/src/Eventuous.Connector.Filters.Grpc/GrpcFilter.cs index fe1c0cb..241f3da 100644 --- a/src/Eventuous.Connector.Filters.Grpc/GrpcFilter.cs +++ b/src/Eventuous.Connector.Filters.Grpc/GrpcFilter.cs @@ -3,15 +3,16 @@ using System.Runtime.CompilerServices; using Eventuous.Connector.Base.Grpc; -using Eventuous.Connector.Base.Tools; +using Eventuous.Connector.Filters.Grpc.Extensions; using Eventuous.Subscriptions.Context; using Eventuous.Subscriptions.Filters; using Google.Protobuf.WellKnownTypes; using Grpc.Core; +using Microsoft.Extensions.Diagnostics.HealthChecks; namespace Eventuous.Connector.Filters.Grpc; -public sealed class GrpcProjectionFilter : ConsumeFilter, IAsyncDisposable { +public sealed class GrpcProjectionFilter : ConsumeFilter, IHealthCheck, IAsyncDisposable { GrpcPartition?[] _partitions = Array.Empty(); readonly Func _partitionFactory; readonly CancellationTokenSource _cts; @@ -36,6 +37,8 @@ await projector.Project( } ); + return; + [MethodImpl(MethodImplOptions.AggressiveInlining)] GrpcPartition GetPartition() { if (_partitions.Length <= context.PartitionId) { @@ -59,6 +62,23 @@ public async ValueTask DisposeAsync() { await _partitions.Where(x => x != null).Select(x => x!.Projector.DisposeAsync()).WhenAll(); } + public Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { + var status = HealthCheckResult.Healthy(); + + // ReSharper disable once ForCanBeConvertedToForeach + for (var i = 0; i < _partitions.Length; i++) { + var partitionProjector = _partitions[i]?.Projector; + if (partitionProjector == null) continue; + + var partitionStatus = partitionProjector.GetStatus(); + if (partitionStatus.Status < status.Status) { + status = partitionStatus; + } + } + + return Task.FromResult(status); + } + record GrpcPartition { public GrpcPartition(string host, ChannelCredentials credentials, CancellationTokenSource cts) { ResponseHandler = new GrpcResponseHandler(); diff --git a/src/Eventuous.Connector.Filters.Grpc/GrpcProjectingProducer.cs b/src/Eventuous.Connector.Filters.Grpc/GrpcProjectingProducer.cs index 0bdb1c2..fb3fd7c 100644 --- a/src/Eventuous.Connector.Filters.Grpc/GrpcProjectingProducer.cs +++ b/src/Eventuous.Connector.Filters.Grpc/GrpcProjectingProducer.cs @@ -1,13 +1,14 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. +using System.Runtime.CompilerServices; using Eventuous.Producers; using Eventuous.Producers.Diagnostics; using Google.Protobuf; using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; -namespace Eventuous.Connector.Base.Grpc; +namespace Eventuous.Connector.Filters.Grpc; public abstract class GrpcProjectingProducer : BaseProducer where T : IEventProducer @@ -22,12 +23,15 @@ protected void On(Func, CancellationToken, Task var temp = new TEvent(); _projectorsByName.Add(temp.Descriptor.FullName, ProjectAny); + return; + static TEvent GetEvent(Any message) { var evt = new TEvent(); evt.MergeFrom(message.Value); return evt; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] Task ProjectAny(ProjectionResponse responseContext, StreamName streamName, CancellationToken token) { var evt = GetEvent(responseContext.Operation); var message = new ProjectedMessage(evt, streamName, responseContext.Metadata); diff --git a/src/Eventuous.Connector.Filters.Grpc/GrpcResponseHandler.cs b/src/Eventuous.Connector.Filters.Grpc/GrpcResponseHandler.cs index addfdc9..fc16750 100644 --- a/src/Eventuous.Connector.Filters.Grpc/GrpcResponseHandler.cs +++ b/src/Eventuous.Connector.Filters.Grpc/GrpcResponseHandler.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Text; +using Eventuous.Connector.Filters.Grpc; using Eventuous.Diagnostics; using Eventuous.Subscriptions.Context; using Eventuous.Subscriptions.Filters; diff --git a/src/Eventuous.Connector.Filters.Grpc/GrpcTransform.cs b/src/Eventuous.Connector.Filters.Grpc/GrpcTransform.cs index d9bc2f6..1cac765 100644 --- a/src/Eventuous.Connector.Filters.Grpc/GrpcTransform.cs +++ b/src/Eventuous.Connector.Filters.Grpc/GrpcTransform.cs @@ -1,7 +1,6 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -using Eventuous.Connector.Base.Grpc; using Eventuous.Gateway; using Eventuous.Subscriptions.Context; using Microsoft.Extensions.Logging; diff --git a/src/Eventuous.Connector.Filters.Grpc/Projector.cs b/src/Eventuous.Connector.Filters.Grpc/Projector.cs index 92c6f7e..6d1ebf6 100644 --- a/src/Eventuous.Connector.Filters.Grpc/Projector.cs +++ b/src/Eventuous.Connector.Filters.Grpc/Projector.cs @@ -1,12 +1,14 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. +using System.Runtime.CompilerServices; using Grpc.Core; using Grpc.Net.Client; using Grpc.Net.Client.Configuration; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Serilog; -namespace Eventuous.Connector.Base.Grpc; +namespace Eventuous.Connector.Filters.Grpc; public sealed class Projector : IAsyncDisposable { static readonly ILogger Log = Serilog.Log.ForContext(); @@ -14,10 +16,10 @@ public sealed class Projector : IAsyncDisposable { readonly MethodConfig _defaultMethodConfig = new() { Names = { MethodName.Default }, RetryPolicy = new RetryPolicy { - MaxAttempts = 20, - InitialBackoff = TimeSpan.FromSeconds(1), - MaxBackoff = TimeSpan.FromSeconds(5), - BackoffMultiplier = 1.5, + MaxAttempts = 20, + InitialBackoff = TimeSpan.FromSeconds(1), + MaxBackoff = TimeSpan.FromSeconds(5), + BackoffMultiplier = 1.5, RetryableStatusCodes = { StatusCode.Unavailable } } }; @@ -28,6 +30,8 @@ public sealed class Projector : IAsyncDisposable { Task? _readTask; CancellationTokenSource? _cts; CancellationToken _ct; + HealthCheckResult _status; + string _host; AsyncDuplexStreamingCall? _call; @@ -37,27 +41,33 @@ public Projector( Func handler ) { _handler = handler; + _host = host; var channel = GrpcChannel.ForAddress( host, new GrpcChannelOptions { - Credentials = credentials, + Credentials = credentials, ServiceConfig = new ServiceConfig { MethodConfigs = { _defaultMethodConfig } } } ); _client = new Projection.ProjectionClient(channel); + _status = HealthCheckResult.Healthy(host); } public void Run(CancellationToken cancellationToken) { _cts = new CancellationTokenSource(); - _ct = cancellationToken; + _ct = cancellationToken; var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, _ct); _call = _client.Project(cancellationToken: linked.Token); _call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.BufferHint); + _readTask = Task.Run(HandleResponses, linked.Token); + + return; + async Task HandleResponses() { Log.Information("Subscribing..."); @@ -67,8 +77,6 @@ async Task HandleResponses() { await _handler(response, linked.Token); } } - - _readTask = Task.Run(HandleResponses, linked.Token); } async Task Resubscribe() { @@ -82,8 +90,7 @@ async Task Resubscribe() { if (_readTask != null) { try { await AwaitCancelled(() => _readTask); - } - catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable) { + } catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable) { Log.Warning("Server unavailable"); } @@ -91,50 +98,64 @@ async Task Resubscribe() { _readTask = null; } - _cts = null; + _cts = null; _call = null; Run(_ct); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public async Task Project(ProjectionRequest projectionContext) { - var retry = 100; + var retry = 0; + + while (!_disposing && !_ct.IsCancellationRequested) { + var r = await ProjectInternal(projectionContext); + + // ReSharper disable once SwitchStatementHandlesSomeKnownEnumValuesWithDefault + switch (r.Result) { + case ProjectResult.Ok: + return; + case ProjectResult.Retry: + _status = HealthCheckResult.Degraded(_host, r.Exception); + Log.Information("Retrying {Retry}", ++retry); + await Task.Delay(retry, _ct); + + break; + case ProjectResult.Fail: + _status = HealthCheckResult.Unhealthy(_host, r.Exception); + Log.Error("Projector to {Host} failed", _host); + retry++; + await Task.Delay(retry ^ 2, _ct); + + break; + } + } + } - while (retry-- > 0 && !_disposing) { - var r = await ProjectInternal(); + async Task<(ProjectResult Result, Exception? Exception)> ProjectInternal(ProjectionRequest projectionContext) { + try { + await _call!.RequestStream.WriteAsync(projectionContext, _ct); - if (r == ProjectResult.Ok) { - break; - } + return (ProjectResult.Ok, null); + } catch (InvalidOperationException e) + when (e.Message.Contains("previous write is in progress")) { + Log.Warning("Write already in progress"); - Log.Information($"Retrying {100 - retry}"); - } + return (ProjectResult.Retry, e); + } catch (ObjectDisposedException e) { + await Resubscribe(); - async Task ProjectInternal() { - try { - await _call!.RequestStream.WriteAsync(projectionContext, _ct); - return ProjectResult.Ok; - } - catch (InvalidOperationException e) - when (e.Message.Contains("previous write is in progress")) { - // TODO: this is a hack, it needs to open multiple streams for concurrent projectors - Log.Warning("Write already in progress"); - return ProjectResult.Retry; - } - catch (ObjectDisposedException) { - await Resubscribe(); - return ProjectResult.Retry; - } - catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable) { - await Resubscribe(); - return ProjectResult.Retry; - } - catch (Exception e) { - Log.Error(e, "Projection failed"); - throw; - } + return (ProjectResult.Retry, e); + } catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable) { + await Resubscribe(); + + return (ProjectResult.Retry, e); + } catch (Exception e) { + return (ProjectResult.Fail, e); } } + public HealthCheckResult GetStatus() => _status; + bool _disposing; public async ValueTask DisposeAsync() { @@ -160,13 +181,12 @@ public async ValueTask DisposeAsync() { static async Task AwaitCancelled(Func action) { try { await action(); - } - catch (OperationCanceledException) { } - catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { } + } catch (OperationCanceledException) { } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { } } } public enum ProjectResult { Ok, - Retry + Retry, + Fail } diff --git a/src/Eventuous.Connector.Filters.Grpc/SubscriptionBuilderExtensions.cs b/src/Eventuous.Connector.Filters.Grpc/SubscriptionBuilderExtensions.cs index 368dadb..2c0c5b0 100644 --- a/src/Eventuous.Connector.Filters.Grpc/SubscriptionBuilderExtensions.cs +++ b/src/Eventuous.Connector.Filters.Grpc/SubscriptionBuilderExtensions.cs @@ -1,15 +1,13 @@ // Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -using Eventuous.Connector.Base.Grpc; using Eventuous.Connector.Filters.Grpc.Config; using Eventuous.Subscriptions.Registrations; +using Microsoft.Extensions.DependencyInjection; namespace Eventuous.Connector.Filters.Grpc; public static class SubscriptionBuilderExtensions { - public static void AddGrpcProjector(this SubscriptionBuilder builder, GrpcProjectorSettings settings) - => builder.AddConsumeFilterLast( - new GrpcProjectionFilter(settings.GetHost(), settings.GetCredentials()) - ); + public static void AddGrpcProjector(this SubscriptionBuilder builder, GrpcProjectorSettings settings, IHealthChecksBuilder healthChecks) + => builder.AddConsumeFilterLast(new GrpcProjectionFilter(settings.GetHost(), settings.GetCredentials())); } diff --git a/src/Eventuous.Connector/Program.cs b/src/Eventuous.Connector/Program.cs index 2589696..4d2a3f8 100644 --- a/src/Eventuous.Connector/Program.cs +++ b/src/Eventuous.Connector/Program.cs @@ -17,7 +17,6 @@ .Add("prometheus", b => b.AddPrometheusExporter()) .Add("otlp", b => b.AddOtlpExporter()); -var app = new StartupBuilder("config.yaml", args) - .BuildApplication(tracingExporters, metricsExporters); +var app = new StartupBuilder("config.yaml", args).BuildApplication(tracingExporters, metricsExporters); return await app.Run(); diff --git a/src/Eventuous.Connector/StartupBuilder.cs b/src/Eventuous.Connector/StartupBuilder.cs index eb90926..374a366 100644 --- a/src/Eventuous.Connector/StartupBuilder.cs +++ b/src/Eventuous.Connector/StartupBuilder.cs @@ -13,9 +13,9 @@ namespace Eventuous.Connector; sealed class StartupBuilder { - readonly ConnectorConfig? _config; - ConnectorApp? _app; - readonly string? _configFile; + readonly ConnectorConfig _config; + readonly string? _configFile; + ConnectorApp? _app; static readonly ILogger Log = Serilog.Log.ForContext(); @@ -27,10 +27,13 @@ public StartupBuilder(string configFile, string[] args) { var hostBuilder = Host.CreateDefaultBuilder(args); hostBuilder.ConfigureHostConfiguration(c => c.AddYamlFile(configFile)); using var tempHost = hostBuilder.Build(); - _config = tempHost.Services.GetRequiredService().Get(); + _config = tempHost.Services + .GetRequiredService() + .Get(b => b.ErrorOnUnknownConfiguration = true)!; if (string.IsNullOrWhiteSpace(_config.Connector.ConnectorAssembly)) { Log.Fatal("Connector assembly must be specified in {ConfigFile}", configFile); + throw new ApplicationException(); } } @@ -39,11 +42,6 @@ public StartupBuilder BuildApplication( ExporterMappings tracingExporters, ExporterMappings metricsExporters ) { - if (_config == null) { - Log.Fatal("Call Configure() first"); - throw new ApplicationException(); - } - var location = Assembly.GetExecutingAssembly().Location; var path = Path.GetDirectoryName(location); @@ -58,6 +56,7 @@ ExporterMappings metricsExporters if (startup == null) { Log.Fatal("Connector assembly must have an implementation of IConnectorStartup"); + throw new ApplicationException(); } @@ -73,11 +72,13 @@ ExporterMappings metricsExporters public async Task Run() { if (_config == null) { Log.Fatal("Call Configure() first"); + throw new ApplicationException(); } if (_app == null) { Log.Fatal("Call BuildApplication() first"); + throw new ApplicationException(); } diff --git a/src/Eventuous.Connector/StartupEnvironment.cs b/src/Eventuous.Connector/StartupEnvironment.cs index 6ea0186..f527820 100644 --- a/src/Eventuous.Connector/StartupEnvironment.cs +++ b/src/Eventuous.Connector/StartupEnvironment.cs @@ -7,11 +7,10 @@ namespace Eventuous.Connector; public class StartupEnvironment : IHostEnvironment { public string ApplicationName { get; set; } = "Eventuous Connector"; - public IFileProvider ContentRootFileProvider { get; set; } = default!; public string ContentRootPath { get; set; } = default!; + public IFileProvider ContentRootFileProvider { get; set; } = default!; - public string EnvironmentName { get; set; } - = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") - ?? Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT") - ?? "Development"; + public string EnvironmentName { get; set; } = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") + ?? Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT") + ?? "Development"; }