Skip to content

Commit

Permalink
fix #1700, fix #1701
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Sep 6, 2023
1 parent 3376055 commit 80bee56
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 36 deletions.
10 changes: 5 additions & 5 deletions otoroshi/app/controllers/BackOfficeController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1820,12 +1820,12 @@ class BackOfficeController(
case Some(config) => {
val index: String = config.index.getOrElse("otoroshi-events")
for {
version <- ElasticUtils.getElasticVersion(config, env)
version <- ElasticUtils.getElasticVersion(config, logger, env)
} yield {
val strTpl: String = version match {
case ElasticVersion.UnderSeven => ElasticTemplates.indexTemplate_v6
case ElasticVersion.AboveSeven => ElasticTemplates.indexTemplate_v7
case ElasticVersion.AboveSevenEight => ElasticTemplates.indexTemplate_v7_8
case ElasticVersion.UnderSeven(_) => ElasticTemplates.indexTemplate_v6
case ElasticVersion.AboveSeven(_) => ElasticTemplates.indexTemplate_v7
case ElasticVersion.AboveSevenEight(_) => ElasticTemplates.indexTemplate_v7_8
}
val template: String = if (config.indexSettings.clientSide) {
strTpl
Expand All @@ -1850,7 +1850,7 @@ class BackOfficeController(
case Some(config) => {
val index: String = config.index.getOrElse("otoroshi-events")
for {
version <- ElasticUtils.checkVersion(config, env)
version <- ElasticUtils.checkVersion(config, logger, env)
} yield {
version match {
case Left(err) => InternalServerError(Json.obj("error" -> err))
Expand Down
153 changes: 122 additions & 31 deletions otoroshi/app/events/impl/ElasticAnalytics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import otoroshi.events._
import otoroshi.models.{ApiKey, ElasticAnalyticsConfig, IndexSettingsInterval, ServiceDescriptor, ServiceGroup}
import org.joda.time.format.{DateTimeFormatterBuilder, ISODateTimeFormat}
import org.joda.time.{DateTime, Interval}
import otoroshi.jobs.updates.Version
import otoroshi.utils.cache.types.UnboundedTrieMap
import play.api.libs.json.Json.JsValueWrapper
import play.api.libs.json._
Expand Down Expand Up @@ -313,6 +314,26 @@ object ElasticTemplates {
| }
| }
| },
| "otoroshiHeadersIn": {
| "properties": {
| "key": {
| "type": "keyword"
| },
| "value": {
| "type": "keyword"
| }
| }
| },
| "otoroshiHeadersOut": {
| "properties": {
| "key": {
| "type": "keyword"
| },
| "value": {
| "type": "keyword"
| }
| }
| },
| "identity": {
| "properties": {
| "label": {
Expand Down Expand Up @@ -348,15 +369,34 @@ object ElasticWritesAnalytics {
}

def isInitialized(config: ElasticAnalyticsConfig): (Boolean, ElasticVersion) = {
clusterInitializedCache.getOrElse(toKey(config), (false, ElasticVersion.UnderSeven))
clusterInitializedCache.getOrElse(toKey(config), {
config.version match {
case Some(rawVersion) => {
val version = Version(rawVersion) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(rawVersion)
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(rawVersion)
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(rawVersion)
case _ => ElasticVersion.AboveSeven(rawVersion)
}
(false, version)
}
case None => (false, ElasticVersion.default)
}
})
}
}

sealed trait ElasticVersion
sealed trait ElasticVersion {
def underSeven: Boolean
}
object ElasticVersion {
case object UnderSeven extends ElasticVersion
case object AboveSeven extends ElasticVersion
case object AboveSevenEight extends ElasticVersion

case class UnderSeven(raw: String) extends ElasticVersion { def underSeven: Boolean = true }
case class AboveSeven(raw: String) extends ElasticVersion { def underSeven: Boolean = false }
case class AboveSevenEight(raw: String) extends ElasticVersion { def underSeven: Boolean = false }

val defaultStr: String = "6.0.0"
val default: ElasticVersion = UnderSeven(defaultStr)
}

object ElasticUtils {
Expand Down Expand Up @@ -399,7 +439,7 @@ object ElasticUtils {
.addHttpHeaders(config.headers.toSeq: _*)
}

def getElasticVersion(config: ElasticAnalyticsConfig, env: Env)(implicit
def getElasticVersion(config: ElasticAnalyticsConfig, logger: Logger, env: Env)(implicit
ec: ExecutionContext
): Future[ElasticVersion] = {

Expand All @@ -408,25 +448,38 @@ object ElasticUtils {
config.version match {
case Some(version) => {
(Version(version) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven
case _ => ElasticVersion.AboveSeven
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(version)
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(version)
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(version)
case _ => ElasticVersion.AboveSeven(version)
}).future
}
case None => {
url(urlFromPath("", config), config, env)
.get()
.map(_.json)
.map(json => (json \ "version" \ "number").asOpt[String].getOrElse("6.0.0"))
// .map(v => v.split("\\.").headOption.map(_.toInt).getOrElse(6))
.map { _v =>
Version(_v) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven
case _ => ElasticVersion.AboveSeven
ElasticUtils.checkVersion(config, logger, env)
.map {
case Left(err) => ElasticVersion.default
case Right(_v) => {
Version(_v) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(_v)
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(_v)
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(_v)
case _ => ElasticVersion.AboveSeven(_v)
}
}
}

//url(urlFromPath("", config), config, env)
// .get()
// .map(_.json)
// .map(json => (json \ "version" \ "number").asOpt[String].orElse(config.version).getOrElse(ElasticVersion.defaultStr))
// // .map(v => v.split("\\.").headOption.map(_.toInt).getOrElse(6))
// .map { _v =>
// Version(_v) match {
// case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(_v)
// case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(_v)
// case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(_v)
// case _ => ElasticVersion.AboveSeven(_v)
// }
// _v.split("\\.").headOption.map(_.toInt).getOrElse(6) match {
// case v if v <= 6 => ElasticVersion.UnderSeven
// case v if v > 6 => {
Expand All @@ -437,7 +490,7 @@ object ElasticUtils {
// }
// }
}
}
//}
}
}

Expand All @@ -448,12 +501,12 @@ object ElasticUtils {
val index: String = config.index.getOrElse("otoroshi-events")
val numberOfShards: String = config.indexSettings.numberOfShards.toString
val numberOfReplicas: String = config.indexSettings.numberOfReplicas.toString
getElasticVersion(config, env).flatMap { version =>
getElasticVersion(config, logger, env).flatMap { version =>
// from elastic 7.8, we should use /_index_template/otoroshi-tpl and wrap almost everything expect index_patterns in a "template" object
val (strTpl, indexTemplatePath) = version match {
case ElasticVersion.UnderSeven => (ElasticTemplates.indexTemplate_v6, "/_template/otoroshi-tpl")
case ElasticVersion.AboveSeven => (ElasticTemplates.indexTemplate_v7, "/_template/otoroshi-tpl")
case ElasticVersion.AboveSevenEight =>
case ElasticVersion.UnderSeven(_) => (ElasticTemplates.indexTemplate_v6, "/_template/otoroshi-tpl")
case ElasticVersion.AboveSeven(_) => (ElasticTemplates.indexTemplate_v7, "/_template/otoroshi-tpl")
case ElasticVersion.AboveSevenEight(_) =>
(ElasticTemplates.indexTemplate_v7_8, "/_index_template/otoroshi-tpl")
}
if (logger.isDebugEnabled) logger.debug(s"$version, $indexTemplatePath")
Expand Down Expand Up @@ -534,15 +587,18 @@ object ElasticUtils {
}
}

def checkVersion(config: ElasticAnalyticsConfig, env: Env)(implicit
def checkVersion(config: ElasticAnalyticsConfig, logger: Logger, env: Env)(implicit
ec: ExecutionContext
): Future[Either[JsValue, String]] = {
url(urlFromPath("", config), config, env)
.get()
.map { resp =>
if (resp.status == 200) {
Right((resp.json \ "version" \ "number").asOpt[String].getOrElse("6.0.0"))
val version = (resp.json \ "version" \ "number").asOpt[String].orElse(config.version).getOrElse(ElasticVersion.defaultStr)
if (logger.isDebugEnabled) logger.debug(s"elastic version from server is: ${version}")
Right(version)
} else {
logger.error(s"error while fetching elastic version: ${resp.status} - ${resp.json}")
Left(resp.json)
}
}
Expand Down Expand Up @@ -585,6 +641,41 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) extends A

if (config.applyTemplate) {
init()
} else {
initializeVersionWithoutTemplate()
}

def initializeVersionWithoutTemplate(): Unit = {
if (ElasticWritesAnalytics.isInitialized(config)._1) {
()
} else {
implicit val ec = env.otoroshiExecutionContext
config.version match {
case Some(versionRaw) => {
val version = Version(versionRaw) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(versionRaw)
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(versionRaw)
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(versionRaw)
case _ => ElasticVersion.AboveSeven(versionRaw)
}
ElasticWritesAnalytics.initialized(config, version)
}
case None => {
ElasticUtils.checkVersion(config, logger, env).map {
case Left(err) => ElasticWritesAnalytics.initialized(config, ElasticVersion.default)
case Right(versionRaw) => {
val version = Version(versionRaw) match {
case v if v.isBefore(Version("7.0.0")) => ElasticVersion.UnderSeven(versionRaw)
case v if v.isAfterEq(Version("7.8.0")) => ElasticVersion.AboveSevenEight(versionRaw)
case v if v.isAfterEq(Version("7.0.0")) => ElasticVersion.AboveSeven(versionRaw)
case _ => ElasticVersion.AboveSeven(versionRaw)
}
ElasticWritesAnalytics.initialized(config, version)
}
}
}
}
}
}

override def init(): Unit = {
Expand Down Expand Up @@ -622,7 +713,7 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) extends A
Json.obj(
"index" -> Json
.obj("_index" -> indexWithDate)
.applyOnIf(version == ElasticVersion.UnderSeven)(_ ++ Json.obj("_type" -> `type`))
.applyOnIf(version.underSeven)(_ ++ Json.obj("_type" -> `type`))
)
)
val sourceClause = Json.stringify(source)
Expand Down Expand Up @@ -704,10 +795,10 @@ class ElasticReadsAnalytics(config: ElasticAnalyticsConfig, env: Env) extends An
ElasticUtils.checkSearch(config, env)

def checkVersion()(implicit ec: ExecutionContext): Future[Either[JsValue, String]] =
ElasticUtils.checkVersion(config, env)
ElasticUtils.checkVersion(config, logger, env)

def getElasticVersion()(implicit ec: ExecutionContext): Future[ElasticVersion] =
ElasticUtils.getElasticVersion(config, env)
ElasticUtils.getElasticVersion(config, logger, env)

private def query(query: JsObject, debug: Boolean = false)(implicit ec: ExecutionContext): Future[QueryResponse] = {
val builder = env.MtlsWs.url(searchUri, config.mtlsConfig)
Expand Down

0 comments on commit 80bee56

Please sign in to comment.