Skip to content

Commit

Permalink
[Bug][Flink-Submit][dev-2.1.4]Fixed security issues with submitting e…
Browse files Browse the repository at this point in the history
…xternal applications to manipulate the Jvm FLINK (#3661)
  • Loading branch information
zhilinli123 authored Apr 10, 2024
1 parent e436389 commit 9a02951
Showing 1 changed file with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.proxy.FlinkShimsProxy

import java.security.Permission

import scala.language.{implicitConversions, reflectiveCalls}
import scala.reflect.ClassTag

object FlinkClient extends Logger {

private[this] val FLINK_CLIENT_HANDLER_CLASS_NAME =
Expand All @@ -46,7 +47,13 @@ object FlinkClient extends Logger {
"org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> "triggerSavepoint"

def submit(submitRequest: SubmitRequest): SubmitResponse = {
proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, SUBMIT_REQUEST)
val securityManager = System.getSecurityManager
try {
System.setSecurityManager(new ExitSecurityManager())
proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, SUBMIT_REQUEST)
} finally {
System.setSecurityManager(securityManager)
}
}

def cancel(stopRequest: CancelRequest): CancelResponse = {
Expand Down Expand Up @@ -87,3 +94,13 @@ object FlinkClient extends Logger {
}

}

/** Used to mask JVM requests for external operations */
class ExitSecurityManager extends SecurityManager {
override def checkExit(status: Int): Unit = {
throw new SecurityException(
s"System.exit($status) was called in your flink job, The job has been stopped, please check your program...")
}

override def checkPermission(perm: Permission): Unit = {}
}

0 comments on commit 9a02951

Please sign in to comment.