Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log the delete batch request in batch operation log #6681

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog

/**
* A [[SparkListener]] based on spark's DeveloperApi [[StatsReportListener]], used to appending
Expand Down Expand Up @@ -78,15 +77,6 @@ class SQLOperationListener(
properties != null && properties.getProperty(KYUUBI_STATEMENT_ID_KEY) == operationId
}

private def withOperationLog(f: => Unit): Unit = {
try {
operation.getOperationLog.foreach(OperationLog.setCurrentOperationLog)
f
} finally {
OperationLog.removeCurrentOperationLog()
}
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
Expand All @@ -105,7 +95,7 @@ class SQLOperationListener(
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds))
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
Expand All @@ -119,7 +109,7 @@ class SQLOperationListener(
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
}
}
Expand All @@ -135,7 +125,7 @@ class SQLOperationListener(
activeStages.put(
stageAttempt,
new SparkStageInfo(stageId, stageInfo.numTasks))
withOperationLog {
operation.withOperationLog {
info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " +
s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running")
}
Expand Down Expand Up @@ -166,7 +156,7 @@ class SQLOperationListener(
operationRunTime.getAndAdd(taskMetrics.executorRunTime)
operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
}
withOperationLog(super.onStageCompleted(stageCompleted))
operation.withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin

override def getOperationLog: Option[OperationLog] = None

override def withOperationLog(f: => Unit): Unit = {
try {
getOperationLog.foreach(OperationLog.setCurrentOperationLog)
f
} finally {
OperationLog.removeCurrentOperationLog()
}
}

OperationAuditLogger.audit(this, OperationState.INITIALIZED)
@volatile protected var state: OperationState = INITIALIZED
@volatile protected var startTime: Long = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ trait Operation {
def getHandle: OperationHandle
def getStatus: OperationStatus
def getOperationLog: Option[OperationLog]
def withOperationLog(f: => Unit): Unit

def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {

val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
fe.getSessionUser(batchSession.user)
val userName = fe.getSessionUser(batchSession.user)
val ipAddress = fe.getIpAddress
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
batchSession.batchJobSubmissionOp.withOperationLog {
warn(s"Received kill batch request from $userName/$ipAddress")
warn(s"Kill batch response: killed: $killed, msg: $msg.")
}
new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
Expand Down
Loading