Skip to content

Commit

Permalink
fix #1636
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Jul 24, 2023
1 parent 0c29638 commit cfeae85
Show file tree
Hide file tree
Showing 11 changed files with 1,608 additions and 2,643 deletions.
409 changes: 100 additions & 309 deletions kubernetes/helm/otoroshi/crds-with-schema.yaml

Large diffs are not rendered by default.

30 changes: 0 additions & 30 deletions kubernetes/helm/otoroshi/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,36 +361,6 @@ spec:
---
apiVersion: "apiextensions.k8s.io/v1"
kind: "CustomResourceDefinition"
metadata:
name: "data-exporters.proxy.otoroshi.io"
creationTimestamp: null
spec:
group: "proxy.otoroshi.io"
names:
kind: "DataExporter"
plural: "data-exporters"
singular: "data-exporter"
scope: "Namespaced"
versions:
- name: "v1alpha1"
served: false
storage: false
deprecated: true
schema:
openAPIV3Schema:
x-kubernetes-preserve-unknown-fields: true
type: "object"
- name: "v1"
served: true
storage: true
deprecated: false
schema:
openAPIV3Schema:
x-kubernetes-preserve-unknown-fields: true
type: "object"
---
apiVersion: "apiextensions.k8s.io/v1"
kind: "CustomResourceDefinition"
metadata:
name: "routes.proxy.otoroshi.io"
creationTimestamp: null
Expand Down
30 changes: 0 additions & 30 deletions kubernetes/kustomize/base/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,36 +361,6 @@ spec:
---
apiVersion: "apiextensions.k8s.io/v1"
kind: "CustomResourceDefinition"
metadata:
name: "data-exporters.proxy.otoroshi.io"
creationTimestamp: null
spec:
group: "proxy.otoroshi.io"
names:
kind: "DataExporter"
plural: "data-exporters"
singular: "data-exporter"
scope: "Namespaced"
versions:
- name: "v1alpha1"
served: false
storage: false
deprecated: true
schema:
openAPIV3Schema:
x-kubernetes-preserve-unknown-fields: true
type: "object"
- name: "v1"
served: true
storage: true
deprecated: false
schema:
openAPIV3Schema:
x-kubernetes-preserve-unknown-fields: true
type: "object"
---
apiVersion: "apiextensions.k8s.io/v1"
kind: "CustomResourceDefinition"
metadata:
name: "routes.proxy.otoroshi.io"
creationTimestamp: null
Expand Down
34 changes: 17 additions & 17 deletions manual/src/main/paradox/code/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -21579,81 +21579,81 @@
},
"otoroshi.models.DataExporterConfig" : {
"type" : "object",
"description" : "Data exporter settings",
"description" : "???",
"properties" : {
"desc" : {
"type" : "string",
"description" : "Description"
"description" : "???"
},
"_loc" : {
"$ref" : "#/components/schemas/otoroshi.models.EntityLocation",
"description" : "Entity location"
"description" : "???"
},
"bufferSize" : {
"type" : "integer",
"format" : "int32",
"description" : "Number of events in buffer"
"description" : "???"
},
"jsonWorkers" : {
"type" : "integer",
"format" : "int32",
"description" : "Number of workers that transform events"
"description" : "???"
},
"groupDuration" : {
"type" : "number",
"description" : "The max duration before sending group"
"description" : "???"
},
"groupSize" : {
"type" : "integer",
"format" : "int32",
"description" : "The max size of events group before sending"
"description" : "???"
},
"type" : {
"$ref" : "#/components/schemas/otoroshi.models.DataExporterConfigType",
"description" : "Entity type"
"description" : "???"
},
"tags" : {
"type" : "array",
"items" : {
"type" : "string"
},
"description" : "Entity tags"
"description" : "???"
},
"sendWorkers" : {
"type" : "integer",
"format" : "int32",
"description" : "Number of workers that sends events"
"description" : "???"
},
"id" : {
"type" : "string",
"description" : "Id of the exporter"
"description" : "???"
},
"name" : {
"type" : "string",
"description" : "Entity name"
"description" : "???"
},
"metadata" : {
"type" : "object",
"additionalProperties" : {
"type" : "string"
},
"description" : "Entity metadata"
"description" : "???"
},
"config" : {
"$ref" : "#/components/schemas/otoroshi.models.Exporter",
"description" : "Exporter config"
"description" : "???"
},
"projection" : {
"type" : "object",
"description" : "Event projection"
"description" : "???"
},
"enabled" : {
"type" : "boolean",
"description" : "Is the exporter enabled"
"description" : "???"
},
"filtering" : {
"$ref" : "#/components/schemas/otoroshi.models.DataExporterConfigFiltering",
"description" : "Filtering options"
"description" : "???"
}
}
},
Expand Down
137 changes: 115 additions & 22 deletions otoroshi/app/events/OtoroshiEventsActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import akka.actor.{Actor, Props}
import akka.http.scaladsl.model.{ContentType, ContentTypes}
import akka.http.scaladsl.util.FastFuture
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{
ApiVersion,
ListBucketResultContents,
MemoryBufferType,
MetaHeaders,
S3Attributes,
S3Settings
}
import akka.stream.alpakka.s3.{ApiVersion, ListBucketResultContents, MemoryBufferType, MetaHeaders, S3Attributes, S3Settings}
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{Attributes, OverflowStrategy, QueueOfferResult}
import com.sksamuel.pulsar4s.Producer
import com.spotify.metrics.core.MetricId
import io.opentelemetry.api.logs.Severity
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.logs.SdkLoggerProvider
import io.opentelemetry.sdk.logs.`export`.{BatchLogRecordProcessor, LogRecordExporter}
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import otoroshi.env.Env
import otoroshi.events.DataExporter.DefaultDataExporter
import otoroshi.events.impl.{ElasticWritesAnalytics, WebHookAnalytics}
Expand All @@ -37,20 +37,7 @@ import otoroshi.utils.cache.types.UnboundedTrieMap
import otoroshi.utils.json.JsonOperationsHelper
import otoroshi.utils.mailer.{EmailLocation, MailerSettings}
import play.api.Logger
import play.api.libs.json.{
Format,
JsArray,
JsBoolean,
JsError,
JsNull,
JsNumber,
JsObject,
JsResult,
JsString,
JsSuccess,
JsValue,
Json
}
import play.api.libs.json.{Format, JsArray, JsBoolean, JsError, JsNull, JsNumber, JsObject, JsResult, JsString, JsSuccess, JsValue, Json}

import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
Expand Down Expand Up @@ -1270,6 +1257,112 @@ object Exporters {
.getOrElse(ExportResult.ExportResultSuccess.vfuture)
}
}

case class OtlpSettings(grpc: Boolean, endpoint: String, timeout: Duration, gzip: Boolean, clientCert: Option[String], trustedCert: Option[String]) extends Exporter {
override def toJson: JsValue = OtlpSettings.format.writes(this)
def exporter(env: Env): LogRecordExporter = {
OtlpGrpcLogRecordExporter.builder()
// .setRetryPolicy() // TODO:
.applyOnWithOpt(clientCert) {
case (builder, id) => env.proxyState.certificate(id) match {
case None => builder
case Some(cert) => builder.setClientTls(cert.privateKey.getBytes, cert.chain.getBytes)
}
}
.applyOnWithOpt(trustedCert) {
case (builder, id) => env.proxyState.certificate(id) match {
case None => builder
case Some(cert) => builder.setTrustedCertificates(cert.chain.getBytes)
}
}
.setCompression(if (gzip) "gzip" else "none")
.setTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.setEndpoint(endpoint)
.build()
}
}

case class OpenTelemetrySdkWrapper(sdk: OpenTelemetrySdk, settings: OtlpSettings) {
def close(): Unit = sdk.close()
def hasChangedFrom(s: OtlpSettings): Boolean = s != settings
}

object OtlpSettings {
private val sdks = new UnboundedTrieMap[String, OpenTelemetrySdkWrapper]()
val format = new Format[OtlpSettings] {
override def writes(o: OtlpSettings): JsValue = Json.obj(
"type" -> "otlp",
"gzip" -> o.gzip,
"grpc" -> o.grpc,
"endpoint" -> o.endpoint,
"timeout" -> o.timeout.toMillis,
"client_cert" -> o.clientCert.map(JsString.apply).getOrElse(JsNull).asValue,
"trusted_cert" -> o.clientCert.map(JsString.apply).getOrElse(JsNull).asValue,
)
override def reads(json: JsValue): JsResult[OtlpSettings] = Try {
OtlpSettings(
grpc = json.select("grpc").asOpt[Boolean].getOrElse(true),
gzip = json.select("gzip").asOpt[Boolean].getOrElse(false),
endpoint = json.select("endpoint").asString,
timeout = json.select("timeout").asOpt[Long].map(_.millis).getOrElse(30.seconds),
clientCert = json.select("client_cert").asOpt[String],
trustedCert = json.select("trusted_cert").asOpt[String],
)
} match {
case Failure(e) => JsError(e.getMessage)
case Success(e) => JsSuccess(e)
}
}
def sdkFor(id: String, name: String, settings: OtlpSettings, env: Env): OpenTelemetrySdkWrapper = sdks.synchronized {
def build(): OpenTelemetrySdkWrapper = {
val sdk = OpenTelemetrySdk.builder()
.setLoggerProvider(
SdkLoggerProvider.builder()
.setResource(
Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, name)
.build()
)
.addLogRecordProcessor(
BatchLogRecordProcessor.builder(settings.exporter(env)).build()
)
.build()
)
.build()
OpenTelemetrySdkWrapper(sdk, settings)
}
var sdk = sdks.getOrUpdate(id) {
build()
}
if (sdk.hasChangedFrom(settings)) {
sdk.close()
sdk = build()
sdks.put(id, sdk)
}
sdk
}
}

class OtlpLogExporter(config: DataExporterConfig)(implicit ec: ExecutionContext, env: Env)
extends DefaultDataExporter(config)(ec, env) {

override def send(events: Seq[JsValue]): Future[ExportResult] = exporter[OtlpSettings] match {
case None => ExportResult.ExportResultFailure("Bad config type !").vfuture
case Some(exporterConfig) => {
val sdk = OtlpSettings.sdkFor(config.id, config.name, exporterConfig, env)
val logger = sdk.sdk.getSdkLoggerProvider.get("otoroshi-appender")
events.foreach { evt =>
logger
.logRecordBuilder()
.setSeverity(Severity.INFO) // TODO: different for alert ?
.setBody(evt.stringify)
//.setAllAttributes() // TODO
.emit()
}
ExportResult.ExportResultSuccess.vfuture
}
}
}
}

class DataExporterUpdateJob extends Job {
Expand Down
7 changes: 7 additions & 0 deletions otoroshi/app/models/dataExporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ object DataExporterConfig {
case "metrics" => MetricsSettings((json \ "config" \ "labels").as[Map[String, String]])
case "custommetrics" => CustomMetricsSettings.format.reads((json \ "config").as[JsObject]).get
case "wasm" => WasmExporterSettings.format.reads((json \ "config").as[JsObject]).get
case "otlp" => OtlpSettings.format.reads((json \ "config").as[JsObject]).get
case _ => throw new RuntimeException("Bad config type")
}
)
Expand Down Expand Up @@ -304,6 +305,10 @@ object DataExporterConfigType {
def name: String = "wasm"
}

case object Otlp extends DataExporterConfigType {
def name: String = "otlp"
}

def parse(str: String): DataExporterConfigType = {
str.toLowerCase() match {
case "kafka" => Kafka
Expand All @@ -321,6 +326,7 @@ object DataExporterConfigType {
case "metrics" => Metrics
case "custommetrics" => CustomMetrics
case "wasm" => Wasm
case "otlp" => Otlp
case _ => None
}
}
Expand Down Expand Up @@ -378,6 +384,7 @@ case class DataExporterConfig(
case c: MetricsSettings => new MetricsExporter(this)
case c: CustomMetricsSettings => new CustomMetricsExporter(this)
case c: WasmExporterSettings => new WasmExporter(this)
case c: OtlpSettings => new OtlpLogExporter(this)
case _ => throw new RuntimeException("unsupported exporter type")
}
}
Expand Down
Loading

0 comments on commit cfeae85

Please sign in to comment.