From 80bee5645fc0f6367b1ffcabc6b48034d5d385cd Mon Sep 17 00:00:00 2001 From: Mathieu Ancelin Date: Wed, 6 Sep 2023 15:00:26 +0200 Subject: [PATCH] fix #1700, fix #1701 --- .../controllers/BackOfficeController.scala | 10 +- .../app/events/impl/ElasticAnalytics.scala | 153 ++++++++++++++---- 2 files changed, 127 insertions(+), 36 deletions(-) diff --git a/otoroshi/app/controllers/BackOfficeController.scala b/otoroshi/app/controllers/BackOfficeController.scala index a80b14d901..970048101d 100644 --- a/otoroshi/app/controllers/BackOfficeController.scala +++ b/otoroshi/app/controllers/BackOfficeController.scala @@ -1820,12 +1820,12 @@ class BackOfficeController( case Some(config) => { val index: String = config.index.getOrElse("otoroshi-events") for { - version <- ElasticUtils.getElasticVersion(config, env) + version <- ElasticUtils.getElasticVersion(config, logger, env) } yield { val strTpl: String = version match { - case ElasticVersion.UnderSeven => ElasticTemplates.indexTemplate_v6 - case ElasticVersion.AboveSeven => ElasticTemplates.indexTemplate_v7 - case ElasticVersion.AboveSevenEight => ElasticTemplates.indexTemplate_v7_8 + case ElasticVersion.UnderSeven(_) => ElasticTemplates.indexTemplate_v6 + case ElasticVersion.AboveSeven(_) => ElasticTemplates.indexTemplate_v7 + case ElasticVersion.AboveSevenEight(_) => ElasticTemplates.indexTemplate_v7_8 } val template: String = if (config.indexSettings.clientSide) { strTpl @@ -1850,7 +1850,7 @@ class BackOfficeController( case Some(config) => { val index: String = config.index.getOrElse("otoroshi-events") for { - version <- ElasticUtils.checkVersion(config, env) + version <- ElasticUtils.checkVersion(config, logger, env) } yield { version match { case Left(err) => InternalServerError(Json.obj("error" -> err)) diff --git a/otoroshi/app/events/impl/ElasticAnalytics.scala b/otoroshi/app/events/impl/ElasticAnalytics.scala index d59b1f4e98..fbd0080b6c 100644 --- a/otoroshi/app/events/impl/ElasticAnalytics.scala +++ b/otoroshi/app/events/impl/ElasticAnalytics.scala @@ -11,6 +11,7 @@ import otoroshi.events._ import otoroshi.models.{ApiKey, ElasticAnalyticsConfig, IndexSettingsInterval, ServiceDescriptor, ServiceGroup} import org.joda.time.format.{DateTimeFormatterBuilder, ISODateTimeFormat} import org.joda.time.{DateTime, Interval} +import otoroshi.jobs.updates.Version import otoroshi.utils.cache.types.UnboundedTrieMap import play.api.libs.json.Json.JsValueWrapper import play.api.libs.json._ @@ -313,6 +314,26 @@ object ElasticTemplates { | } | } | }, + | "otoroshiHeadersIn": { + | "properties": { + | "key": { + | "type": "keyword" + | }, + | "value": { + | "type": "keyword" + | } + | } + | }, + | "otoroshiHeadersOut": { + | "properties": { + | "key": { + | "type": "keyword" + | }, + | "value": { + | "type": "keyword" + | } + | } + | }, | "identity": { | "properties": { | "label": { @@ -348,15 +369,34 @@ object ElasticWritesAnalytics { } def isInitialized(config: ElasticAnalyticsConfig): (Boolean, ElasticVersion) = { - clusterInitializedCache.getOrElse(toKey(config), (false, ElasticVersion.UnderSeven)) + clusterInitializedCache.getOrElse(toKey(config), { + config.version match { + case Some(rawVersion) => { + val version = Version(rawVersion) match { + case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(rawVersion) + case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(rawVersion) + case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(rawVersion) + case _ => ElasticVersion.AboveSeven(rawVersion) + } + (false, version) + } + case None => (false, ElasticVersion.default) + } + }) } } -sealed trait ElasticVersion +sealed trait ElasticVersion { + def underSeven: Boolean +} object ElasticVersion { - case object UnderSeven extends ElasticVersion - case object AboveSeven extends ElasticVersion - case object AboveSevenEight extends ElasticVersion + + case class UnderSeven(raw: String) extends ElasticVersion { def underSeven: Boolean = true } + case class AboveSeven(raw: String) extends ElasticVersion { def underSeven: Boolean = false } + case class AboveSevenEight(raw: String) extends ElasticVersion { def underSeven: Boolean = false } + + val defaultStr: String = "6.0.0" + val default: ElasticVersion = UnderSeven(defaultStr) } object ElasticUtils { @@ -399,7 +439,7 @@ object ElasticUtils { .addHttpHeaders(config.headers.toSeq: _*) } - def getElasticVersion(config: ElasticAnalyticsConfig, env: Env)(implicit + def getElasticVersion(config: ElasticAnalyticsConfig, logger: Logger, env: Env)(implicit ec: ExecutionContext ): Future[ElasticVersion] = { @@ -408,25 +448,38 @@ object ElasticUtils { config.version match { case Some(version) => { (Version(version) match { - case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven - case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight - case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven - case _ => ElasticVersion.AboveSeven + case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(version) + case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(version) + case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(version) + case _ => ElasticVersion.AboveSeven(version) }).future } case None => { - url(urlFromPath("", config), config, env) - .get() - .map(_.json) - .map(json => (json \ "version" \ "number").asOpt[String].getOrElse("6.0.0")) - // .map(v => v.split("\\.").headOption.map(_.toInt).getOrElse(6)) - .map { _v => - Version(_v) match { - case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven - case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight - case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven - case _ => ElasticVersion.AboveSeven + ElasticUtils.checkVersion(config, logger, env) + .map { + case Left(err) => ElasticVersion.default + case Right(_v) => { + Version(_v) match { + case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(_v) + case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(_v) + case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(_v) + case _ => ElasticVersion.AboveSeven(_v) + } } + } + + //url(urlFromPath("", config), config, env) + // .get() + // .map(_.json) + // .map(json => (json \ "version" \ "number").asOpt[String].orElse(config.version).getOrElse(ElasticVersion.defaultStr)) + // // .map(v => v.split("\\.").headOption.map(_.toInt).getOrElse(6)) + // .map { _v => + // Version(_v) match { + // case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(_v) + // case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(_v) + // case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(_v) + // case _ => ElasticVersion.AboveSeven(_v) + // } // _v.split("\\.").headOption.map(_.toInt).getOrElse(6) match { // case v if v <= 6 => ElasticVersion.UnderSeven // case v if v > 6 => { @@ -437,7 +490,7 @@ object ElasticUtils { // } // } } - } + //} } } @@ -448,12 +501,12 @@ object ElasticUtils { val index: String = config.index.getOrElse("otoroshi-events") val numberOfShards: String = config.indexSettings.numberOfShards.toString val numberOfReplicas: String = config.indexSettings.numberOfReplicas.toString - getElasticVersion(config, env).flatMap { version => + getElasticVersion(config, logger, env).flatMap { version => // from elastic 7.8, we should use /_index_template/otoroshi-tpl and wrap almost everything expect index_patterns in a "template" object val (strTpl, indexTemplatePath) = version match { - case ElasticVersion.UnderSeven => (ElasticTemplates.indexTemplate_v6, "/_template/otoroshi-tpl") - case ElasticVersion.AboveSeven => (ElasticTemplates.indexTemplate_v7, "/_template/otoroshi-tpl") - case ElasticVersion.AboveSevenEight => + case ElasticVersion.UnderSeven(_) => (ElasticTemplates.indexTemplate_v6, "/_template/otoroshi-tpl") + case ElasticVersion.AboveSeven(_) => (ElasticTemplates.indexTemplate_v7, "/_template/otoroshi-tpl") + case ElasticVersion.AboveSevenEight(_) => (ElasticTemplates.indexTemplate_v7_8, "/_index_template/otoroshi-tpl") } if (logger.isDebugEnabled) logger.debug(s"$version, $indexTemplatePath") @@ -534,15 +587,18 @@ object ElasticUtils { } } - def checkVersion(config: ElasticAnalyticsConfig, env: Env)(implicit + def checkVersion(config: ElasticAnalyticsConfig, logger: Logger, env: Env)(implicit ec: ExecutionContext ): Future[Either[JsValue, String]] = { url(urlFromPath("", config), config, env) .get() .map { resp => if (resp.status == 200) { - Right((resp.json \ "version" \ "number").asOpt[String].getOrElse("6.0.0")) + val version = (resp.json \ "version" \ "number").asOpt[String].orElse(config.version).getOrElse(ElasticVersion.defaultStr) + if (logger.isDebugEnabled) logger.debug(s"elastic version from server is: ${version}") + Right(version) } else { + logger.error(s"error while fetching elastic version: ${resp.status} - ${resp.json}") Left(resp.json) } } @@ -585,6 +641,41 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) extends A if (config.applyTemplate) { init() + } else { + initializeVersionWithoutTemplate() + } + + def initializeVersionWithoutTemplate(): Unit = { + if (ElasticWritesAnalytics.isInitialized(config)._1) { + () + } else { + implicit val ec = env.otoroshiExecutionContext + config.version match { + case Some(versionRaw) => { + val version = Version(versionRaw) match { + case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(versionRaw) + case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(versionRaw) + case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(versionRaw) + case _ => ElasticVersion.AboveSeven(versionRaw) + } + ElasticWritesAnalytics.initialized(config, version) + } + case None => { + ElasticUtils.checkVersion(config, logger, env).map { + case Left(err) => ElasticWritesAnalytics.initialized(config, ElasticVersion.default) + case Right(versionRaw) => { + val version = Version(versionRaw) match { + case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(versionRaw) + case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(versionRaw) + case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(versionRaw) + case _ => ElasticVersion.AboveSeven(versionRaw) + } + ElasticWritesAnalytics.initialized(config, version) + } + } + } + } + } } override def init(): Unit = { @@ -622,7 +713,7 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) extends A Json.obj( "index" -> Json .obj("_index" -> indexWithDate) - .applyOnIf(version == ElasticVersion.UnderSeven)(_ ++ Json.obj("_type" -> `type`)) + .applyOnIf(version.underSeven)(_ ++ Json.obj("_type" -> `type`)) ) ) val sourceClause = Json.stringify(source) @@ -704,10 +795,10 @@ class ElasticReadsAnalytics(config: ElasticAnalyticsConfig, env: Env) extends An ElasticUtils.checkSearch(config, env) def checkVersion()(implicit ec: ExecutionContext): Future[Either[JsValue, String]] = - ElasticUtils.checkVersion(config, env) + ElasticUtils.checkVersion(config, logger, env) def getElasticVersion()(implicit ec: ExecutionContext): Future[ElasticVersion] = - ElasticUtils.getElasticVersion(config, env) + ElasticUtils.getElasticVersion(config, logger, env) private def query(query: JsObject, debug: Boolean = false)(implicit ec: ExecutionContext): Future[QueryResponse] = { val builder = env.MtlsWs.url(searchUri, config.mtlsConfig)