diff --git a/build.gradle b/build.gradle index f4f92c45f..9e2ce3d6d 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,6 @@ buildscript { } common_utils_version = System.getProperty("common_utils.version", opensearch_build) kotlin_version = System.getProperty("kotlin.version", "1.6.0") - job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) jackson_version = "2.14.1" } @@ -67,7 +66,6 @@ opensearchplugin { name 'opensearch-observability' description 'OpenSearch Plugin for OpenSearch Dashboards Observability' classname "org.opensearch.observability.ObservabilityPlugin" - extendedPlugins = ['opensearch-job-scheduler'] } publishing { @@ -188,7 +186,6 @@ allprojects { } dependencies { - zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" implementation "org.opensearch:opensearch:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" @@ -202,7 +199,6 @@ dependencies { implementation group: 'com.github.wnameless.json', name: 'json-base', version: '2.2.1' implementation "com.fasterxml.jackson.core:jackson-databind:${jackson_version}" implementation "com.fasterxml.jackson.core:jackson-annotations:${jackson_version}" - compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}" testImplementation( 'org.assertj:assertj-core:3.16.1', 'org.junit.jupiter:junit-jupiter-api:5.6.2' @@ -317,18 +313,6 @@ integTest.getClusters().forEach { c -> c.plugin(project.getObjects().filePropert testClusters.integTest { testDistribution = "INTEG_TEST" - // need to install job-scheduler first, need to assemble job-scheduler first - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.getSingleFile() - } - } - } - })) // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 if (_numNodes > 1) numberOfNodes = _numNodes @@ -422,17 +406,6 @@ task prepareBwcTests { dependsOn bundle doLast { plugins = [ - provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.getSingleFile() - } - } - } - }), project.getObjects().fileProperty().value(bundle.getArchiveFile()) ] } diff --git a/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt b/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt index d4137c1ba..0e18117e1 100644 --- a/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt +++ b/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt @@ -19,9 +19,6 @@ import org.opensearch.common.settings.SettingsFilter import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment -import org.opensearch.jobscheduler.spi.JobSchedulerExtension -import org.opensearch.jobscheduler.spi.ScheduledJobParser -import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.observability.action.CreateObservabilityObjectAction import org.opensearch.observability.action.DeleteObservabilityObjectAction import org.opensearch.observability.action.GetObservabilityObjectAction @@ -31,9 +28,6 @@ import org.opensearch.observability.index.ObservabilityMetricsIndex import org.opensearch.observability.index.ObservabilityTracesIndex import org.opensearch.observability.resthandler.ObservabilityRestHandler import org.opensearch.observability.resthandler.ObservabilityStatsRestHandler -import org.opensearch.observability.resthandler.SchedulerRestHandler -import org.opensearch.observability.scheduler.ObservabilityJobParser -import org.opensearch.observability.scheduler.ObservabilityJobRunner import org.opensearch.observability.settings.PluginSettings import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.ClusterPlugin @@ -51,7 +45,7 @@ import java.util.function.Supplier * This class initializes the rest handlers. */ @Suppress("TooManyFunctions") -class ObservabilityPlugin : Plugin(), ActionPlugin, ClusterPlugin, JobSchedulerExtension { +class ObservabilityPlugin : Plugin(), ActionPlugin, ClusterPlugin { companion object { const val PLUGIN_NAME = "opensearch-observability" @@ -111,7 +105,6 @@ class ObservabilityPlugin : Plugin(), ActionPlugin, ClusterPlugin, JobSchedulerE return listOf( ObservabilityRestHandler(), ObservabilityStatsRestHandler(), - SchedulerRestHandler() // TODO: tmp rest handler only for POC purpose ) } @@ -138,20 +131,4 @@ class ObservabilityPlugin : Plugin(), ActionPlugin, ClusterPlugin, JobSchedulerE ) ) } - - override fun getJobType(): String { - return "observability" - } - - override fun getJobIndex(): String { - return SchedulerRestHandler.SCHEDULED_JOB_INDEX - } - - override fun getJobRunner(): ScheduledJobRunner { - return ObservabilityJobRunner - } - - override fun getJobParser(): ScheduledJobParser { - return ObservabilityJobParser - } } diff --git a/src/main/kotlin/org/opensearch/observability/model/RestTag.kt b/src/main/kotlin/org/opensearch/observability/model/RestTag.kt index c8a168a1e..20b04f2b3 100644 --- a/src/main/kotlin/org/opensearch/observability/model/RestTag.kt +++ b/src/main/kotlin/org/opensearch/observability/model/RestTag.kt @@ -33,10 +33,6 @@ internal object RestTag { const val OPERATIONAL_PANEL_FIELD = "operationalPanel" const val APPLICATION_FIELD = "application" const val TIMESTAMP_FIELD = "timestamp" - const val SCHEDULE_INFO_TAG = "schedule" - const val SCHEDULED_JOB_TYPE_TAG = "jobType" - const val ID_FIELD = "id" - const val IS_ENABLED_TAG = "isEnabled" private val INCLUDE_ID = Pair(OBJECT_ID_FIELD, "true") private val EXCLUDE_ACCESS = Pair(ACCESS_LIST_FIELD, "false") val REST_OUTPUT_PARAMS: Params = ToXContent.MapParams(mapOf(INCLUDE_ID)) diff --git a/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt b/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt deleted file mode 100644 index aa2cfad01..000000000 --- a/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt +++ /dev/null @@ -1,171 +0,0 @@ -package org.opensearch.observability.model - -import org.opensearch.common.io.stream.StreamOutput -import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.common.xcontent.XContentParserUtils -import org.opensearch.core.xcontent.ToXContent -import org.opensearch.core.xcontent.XContentBuilder -import org.opensearch.core.xcontent.XContentParser -import org.opensearch.jobscheduler.spi.ScheduledJobParameter -import org.opensearch.jobscheduler.spi.schedule.Schedule -import org.opensearch.jobscheduler.spi.schedule.ScheduleParser -import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX -import org.opensearch.observability.model.RestTag.ACCESS_LIST_FIELD -import org.opensearch.observability.model.RestTag.CREATED_TIME_FIELD -import org.opensearch.observability.model.RestTag.ID_FIELD -import org.opensearch.observability.model.RestTag.IS_ENABLED_TAG -import org.opensearch.observability.model.RestTag.OBJECT_ID_FIELD -import org.opensearch.observability.model.RestTag.SCHEDULED_JOB_TYPE_TAG -import org.opensearch.observability.model.RestTag.SCHEDULE_INFO_TAG -import org.opensearch.observability.model.RestTag.TENANT_FIELD -import org.opensearch.observability.model.RestTag.UPDATED_TIME_FIELD -import org.opensearch.observability.security.UserAccessManager.DEFAULT_TENANT -import org.opensearch.observability.util.logger -import org.opensearch.observability.util.stringList -import java.io.IOException -import java.time.Instant - -/** - * TODO: this whole class is for poc purpose. As for actual implementation, it depends on the data model of Metric. - */ -internal data class ScheduledJobDoc( - val id: String, - val updatedTime: Instant, - val createdTime: Instant, - val tenant: String, - val access: List, - val jobType: JobType, - val scheduleInfo: Schedule, - val enabled: Boolean -) : ScheduledJobParameter, BaseModel { - - internal enum class JobType { Metrics, Uptime } - - internal companion object { - private val log by logger(ScheduledJobDoc::class.java) - - /** - * Parse the data from parser and create ScheduledJobDoc object - * @param parser data referenced at parser - * @param userId use this id if not available in the json - * @return created ScheduledJobDoc object - */ - @JvmStatic - @Throws(IOException::class) - @Suppress("ComplexMethod") - fun parse(parser: XContentParser, userId: String? = null): ScheduledJobDoc { - var id: String? = userId - var updatedTime: Instant? = null - var createdTime: Instant? = null - var tenant: String? = null - var access: List = listOf() - var jobType: JobType? = null - var scheduleInfo: Schedule? = null - var enabled = false - - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser) - while (XContentParser.Token.END_OBJECT != parser.nextToken()) { - val fieldName = parser.currentName() - parser.nextToken() - when (fieldName) { - OBJECT_ID_FIELD -> id = parser.text() - UPDATED_TIME_FIELD -> updatedTime = Instant.ofEpochMilli(parser.longValue()) - CREATED_TIME_FIELD -> createdTime = Instant.ofEpochMilli(parser.longValue()) - TENANT_FIELD -> tenant = parser.text() - ACCESS_LIST_FIELD -> access = parser.stringList() - SCHEDULED_JOB_TYPE_TAG -> jobType = JobType.valueOf(parser.text()) - SCHEDULE_INFO_TAG -> scheduleInfo = ScheduleParser.parse(parser) - IS_ENABLED_TAG -> enabled = parser.booleanValue() - else -> { - parser.skipChildren() - log.info("$LOG_PREFIX:ScheduledJobDoc Skipping Unknown field $fieldName") - } - } - } - - id ?: throw IllegalArgumentException("$ID_FIELD field absent") - updatedTime ?: throw IllegalArgumentException("$UPDATED_TIME_FIELD field absent") - createdTime ?: throw IllegalArgumentException("$CREATED_TIME_FIELD field absent") - tenant = tenant ?: DEFAULT_TENANT - jobType ?: throw IllegalArgumentException("$SCHEDULED_JOB_TYPE_TAG field absent") - scheduleInfo ?: throw IllegalArgumentException("$SCHEDULE_INFO_TAG field absent") - - return ScheduledJobDoc( - id, - updatedTime, - createdTime, - tenant, - access, - jobType, - scheduleInfo, - enabled - ) - } - } - - /** - * create XContentBuilder from this object using [XContentFactory.jsonBuilder()] - * @param params XContent parameters - * @return created XContentBuilder object - */ - fun toXContent(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): XContentBuilder? { - return toXContent(XContentFactory.jsonBuilder(), params) - } - - override fun writeTo(output: StreamOutput) { - output.writeString(id) - output.writeInstant(updatedTime) - output.writeInstant(createdTime) - output.writeString(tenant) - output.writeStringCollection(access) - output.writeEnum(jobType) - output.writeEnum(jobType) // jobType is read twice in constructor - output.writeOptionalWriteable(scheduleInfo) - output.writeBoolean(enabled) - } - - /** - * {ref toXContent} - */ - override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { - builder!! - builder.startObject() - if (params?.paramAsBoolean(ID_FIELD, false) == true) { - builder.field(ID_FIELD, id) - } - builder.field(UPDATED_TIME_FIELD, updatedTime.toEpochMilli()) - .field(CREATED_TIME_FIELD, createdTime.toEpochMilli()) - .field(TENANT_FIELD, tenant) - if (params?.paramAsBoolean(ACCESS_LIST_FIELD, true) == true && access.isNotEmpty()) { - builder.field(ACCESS_LIST_FIELD, access) - } - - builder.field(SCHEDULE_INFO_TAG) - schedule.toXContent(builder, ToXContent.EMPTY_PARAMS) - - builder.field(SCHEDULED_JOB_TYPE_TAG, jobType) - .field(IS_ENABLED_TAG, enabled) - builder.endObject() - return builder - } - - override fun getName(): String { - return "poc name" // TODO: placeholder e.g. metric.name - } - - override fun getLastUpdateTime(): Instant { - return updatedTime - } - - override fun getEnabledTime(): Instant { - return createdTime - } - - override fun getSchedule(): Schedule { - return scheduleInfo - } - - override fun isEnabled(): Boolean { - return enabled - } -} diff --git a/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt b/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt deleted file mode 100644 index 28f3082d8..000000000 --- a/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt +++ /dev/null @@ -1,155 +0,0 @@ -package org.opensearch.observability.resthandler - -import org.opensearch.action.ActionListener -import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse -import org.opensearch.client.node.NodeClient -import org.opensearch.common.xcontent.json.JsonXContent -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule -import org.opensearch.observability.model.RestTag -import org.opensearch.observability.model.ScheduledJobDoc -import org.opensearch.rest.BaseRestHandler -import org.opensearch.rest.BaseRestHandler.RestChannelConsumer -import org.opensearch.rest.BytesRestResponse -import org.opensearch.rest.RestChannel -import org.opensearch.rest.RestHandler -import org.opensearch.rest.RestRequest -import org.opensearch.rest.RestResponse -import org.opensearch.rest.RestStatus -import java.io.IOException -import java.time.Instant -import java.time.temporal.ChronoUnit - -/** - * TODO: This REST handler is for POC to verify that job-scheduler workflow can run in Observability. - * In the future this will be removed. Scheduling won't have it's own REST API. It always comes with create Metic API - */ -internal class SchedulerRestHandler : BaseRestHandler() { - companion object { - private const val SCHEDULE_ACTION = "observability_jobs_actions" - const val SCHEDULED_JOB_INDEX = ".opensearch-observability-job" - private const val OBSERVABILITY_SCHEDULE_URL = "_plugins/poc/_schedule" - private const val CONSTANT = 10 - } - - /** - * {@inheritDoc} - */ - override fun getName(): String { - return SCHEDULE_ACTION - } - - /** - * {@inheritDoc} - */ - override fun routes(): List { - return listOf( - /** - * Create a new object - * Request URL: POST OBSERVABILITY_URL - * Request body: Ref [org.opensearch.observability.model.CreateObservabilityObjectRequest] - * Response body: Ref [org.opensearch.observability.model.CreateObservabilityObjectResponse] - */ - RestHandler.Route(RestRequest.Method.POST, OBSERVABILITY_SCHEDULE_URL) - ) - } - - /** - * {@inheritDoc} - */ - override fun responseParams(): Set { - return setOf( - RestTag.OBJECT_ID_FIELD, - RestTag.OBJECT_ID_LIST_FIELD, - RestTag.OBJECT_TYPE_FIELD, - RestTag.SORT_FIELD_FIELD, - RestTag.SORT_ORDER_FIELD, - RestTag.FROM_INDEX_FIELD, - RestTag.MAX_ITEMS_FIELD - ) - } - - private fun executePostRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - // TODO: Indexing a scheduled job will happen within the workflow of creating a metric. Below is for POC only. - - if (request.method() == RestRequest.Method.POST) { - val sampleId = "id" + Math.random() * CONSTANT - val scheduledJob = ScheduledJobDoc( - sampleId, - Instant.now(), - Instant.now(), - "__user__", - listOf(), - ScheduledJobDoc.JobType.Metrics, - IntervalSchedule( - Instant.now(), - 1, - ChronoUnit.MINUTES - ), - true, - ) - val indexRequest: IndexRequest = IndexRequest() - .index(SCHEDULED_JOB_INDEX) - .id(sampleId) - .source(scheduledJob.toXContent()) - - return RestChannelConsumer { restChannel: RestChannel -> - // index the job parameter - - client.index( - indexRequest, - object : ActionListener { - override fun onResponse(indexResponse: IndexResponse) { - try { - val restResponse: RestResponse = BytesRestResponse( - RestStatus.OK, - indexResponse.toXContent(JsonXContent.contentBuilder(), null) - ) - restChannel.sendResponse(restResponse) - } catch (e: IOException) { - restChannel.sendResponse( - BytesRestResponse( - RestStatus.INTERNAL_SERVER_ERROR, - e.message - ) - ) - } - } - - override fun onFailure(e: Exception) { - restChannel.sendResponse( - BytesRestResponse( - RestStatus.INTERNAL_SERVER_ERROR, - e.message - ) - ) - } - } - ) - } - } else { - return RestChannelConsumer { restChannel: RestChannel -> - restChannel.sendResponse( - BytesRestResponse( - RestStatus.METHOD_NOT_ALLOWED, - request.method().toString() + " is not allowed." - ) - ) - } - } - } - - override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - return when (request.method()) { - RestRequest.Method.POST -> executePostRequest(request, client) - else -> RestChannelConsumer { - it.sendResponse( - BytesRestResponse( - RestStatus.METHOD_NOT_ALLOWED, - "${request.method()} is not allowed" - ) - ) - } - } - } -} diff --git a/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt b/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt deleted file mode 100644 index b0dd7b70b..000000000 --- a/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.observability.scheduler - -import org.opensearch.core.xcontent.XContentParser -import org.opensearch.jobscheduler.spi.JobDocVersion -import org.opensearch.jobscheduler.spi.ScheduledJobParameter -import org.opensearch.jobscheduler.spi.ScheduledJobParser -import org.opensearch.observability.model.ScheduledJobDoc - -internal object ObservabilityJobParser : ScheduledJobParser { - /** - * {@inheritDoc} - */ - override fun parse(xContentParser: XContentParser, id: String, jobDocVersion: JobDocVersion): ScheduledJobParameter { - xContentParser.nextToken() - return ScheduledJobDoc.parse(xContentParser, id) - } -} diff --git a/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt b/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt deleted file mode 100644 index 9f52113c3..000000000 --- a/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.observability.scheduler - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import org.opensearch.jobscheduler.spi.JobExecutionContext -import org.opensearch.jobscheduler.spi.ScheduledJobParameter -import org.opensearch.jobscheduler.spi.ScheduledJobRunner -import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX -import org.opensearch.observability.model.ScheduledJobDoc -import org.opensearch.observability.util.logger - -internal object ObservabilityJobRunner : ScheduledJobRunner { - private val log by logger(ObservabilityJobRunner::class.java) - private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) - - override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { - if (job !is ScheduledJobDoc) { - log.warn("$LOG_PREFIX:job is not of type ScheduledJobDoc:${job.javaClass.name}") - throw IllegalArgumentException("job is not of type ScheduledJobDoc:${job.javaClass.name}") - } - - scope.launch { - val scheduledJob: ScheduledJobDoc = job - val jobType = scheduledJob.jobType - // TODO: Add logic to retrieve metric and update metric index. E,g. run PPL/SQL query via transport API - // and write to metric index after some processing. - - log.info("POC: Running job type: ${jobType.name}") - } - } -} diff --git a/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt b/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt index 442bfc37a..60fd5d455 100644 --- a/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt +++ b/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt @@ -21,10 +21,6 @@ class ObservabilityPluginIT : OpenSearchIntegTestCase() { nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()) val nodesInfoResponse = client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet() val pluginInfos = nodesInfoResponse.nodes[0].getInfo(PluginsAndModules::class.java).pluginInfos - assertTrue( - pluginInfos.stream() - .anyMatch { pluginInfo: PluginInfo -> pluginInfo.name == "opensearch-job-scheduler" } - ) assertTrue( pluginInfos.stream() .anyMatch { pluginInfo: PluginInfo -> pluginInfo.name == "opensearch-observability" }