Skip to content

Commit

Permalink
Merge pull request #474 from TheHive-Project/fix/DL-717/cortex-docker…
Browse files Browse the repository at this point in the history
…-client

[DL-717] Fix: cortex docker client
  • Loading branch information
BillOTei authored Oct 8, 2024
2 parents 423deff + 7103b71 commit e0b67f9
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 98 deletions.
121 changes: 31 additions & 90 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package org.thp.cortex.services

import akka.actor.ActorSystem
import com.spotify.docker.client.DockerClient.LogsParam
import com.spotify.docker.client.messages.HostConfig.Bind
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient}
import play.api.libs.json.Json
import play.api.{Configuration, Logger}

import java.nio.charset.StandardCharsets
import java.nio.file._
import java.util.concurrent.TimeUnit
import javax.inject.{Inject, Singleton}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

@Singleton
class DockerJobRunnerSrv(
client: DockerClient,
config: Configuration,
javaClient: DockerJavaClient,
autoUpdate: Boolean,
jobBaseDirectory: Path,
dockerJobBaseDirectory: Path,
Expand All @@ -28,17 +25,7 @@ class DockerJobRunnerSrv(
@Inject()
def this(config: Configuration, system: ActorSystem) =
this(
new DefaultDockerClient.Builder()
.apiVersion(config.getOptional[String]("docker.version").orNull)
.connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100))
.connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000))
//.dockerCertificates()
.readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000))
//.registryAuthSupplier()
.uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock"))
.useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false))
.build(),
config,
new DockerJavaClient(config),
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true),
Paths.get(config.get[String]("job.directory")),
Paths.get(config.get[String]("job.dockerDirectory")),
Expand All @@ -50,89 +37,43 @@ class DockerJobRunnerSrv(
lazy val isAvailable: Boolean =
Try {
logger.debug(s"Retrieve docker information ...")
logger.info(s"Docker is available:\n${client.info()}")
logger.info(s"Docker is available:\n${javaClient.info}")
true
}.recover {
case error =>
logger.info(s"Docker is not available", error)
false
}.get

def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit
ec: ExecutionContext
): Try[Unit] = {
import scala.collection.JavaConverters._
if (autoUpdate) Try(client.pull(dockerImage))
// ContainerConfig.builder().addVolume()
val hostConfigBuilder = HostConfig.builder()
config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd)
config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop)
config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent)
config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_))
config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_))
config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns)
config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch)
config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts)
config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_))
config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_))
config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_))
config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_))
config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_))
config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode)
config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_))
hostConfigBuilder.appendBinds(
Bind
.from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString)
.to("/job")
.readOnly(false)
.build()
)
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val containerConfigBuilder = ContainerConfig
.builder()
.hostConfig(hostConfigBuilder.build())
.image(dockerImage)
.cmd("/job")
private def generateErrorOutput(containerId: String, f: Path) = {
logger.warn(s"the runner didn't generate any output file $f")
for {
output <- javaClient.getLogs(containerId)
report = Json.obj("success" -> false, "errorMessage" -> output)
_ <- Try(Files.write(f, report.toString.getBytes(StandardCharsets.UTF_8)))
} yield report
}

val containerConfig =
if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build()
else containerConfigBuilder.build()
val containerCreation = client.createContainer(containerConfig)
// Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn)
def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = {
val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS))

logger.debug(s"Container configuration: $containerConfig")
logger.info(
s"Execute container ${containerCreation.id()}\n" +
s" timeout: ${timeout.fold("none")(_.toString)}\n" +
s" image : $dockerImage\n" +
s" volume : ${jobDirectory.toAbsolutePath}:/job" +
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)
)

val timeoutSched = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, stopping the container")
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
}
)
val execution = Try {
client.startContainer(containerCreation.id())
client.waitContainer(containerCreation.id())
()
}
timeoutSched.foreach(_.cancel())
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
logger.warn(s"The worker didn't generate output file.")
val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully())
.fold(e => s"Container logs can't be read (${e.getMessage})", identity)
val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output)
if (autoUpdate) Try(javaClient.pullImage(dockerImage))

val report = Json.obj("success" -> false, "errorMessage" -> message)
Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8))
}
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
execution
for {
containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to)
timeoutScheduled = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, stopping the container")
javaClient.clean(containerId)
}
)
_ <- javaClient.execute(containerId)
_ = timeoutScheduled.foreach(_.cancel())
outputFile <- Try(jobDirectory.resolve("output").resolve("output.json"))
isError = Files.notExists(outputFile) || Files.size(outputFile) == 0 || Files.isDirectory(outputFile)
_ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None
_ <- javaClient.clean(containerId)
} yield ()
}

}
167 changes: 167 additions & 0 deletions app/org/thp/cortex/util/docker/DockerClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.thp.cortex.util.docker

import com.github.dockerjava.api.model._
import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, DockerClientImpl}
import com.github.dockerjava.transport.DockerHttpClient
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import play.api.{Configuration, Logger}

import java.nio.file.{Files, Path}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.blocking
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.util.Try

class DockerClient(config: Configuration) {
private lazy val logger: Logger = Logger(getClass.getName)
private lazy val (dockerConf, httpClient) = getHttpClient
private lazy val underlyingClient = DockerClientImpl.getInstance(dockerConf, httpClient)

def execute(containerId: String): Try[Int] =
Try {
val startContainerCmd = underlyingClient.startContainerCmd(containerId)
startContainerCmd.exec()
val waitResult = underlyingClient
.waitContainerCmd(containerId)
.start()
.awaitStatusCode()
logger.info(s"container $containerId started and awaited with code: $waitResult")

waitResult
}

def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try {
logger.info(s"image $image pull result: ${pullImage(image)}")
val containerCmd = underlyingClient
.createContainerCmd(image)
.withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory))
if (Files.exists(jobDirectory.resolve("input").resolve("cacerts")))
containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts")
val containerResponse = containerCmd.exec()
logger.info(
s"about to start container ${containerResponse.getId}\n" +
s" timeout: ${timeout.toString}\n" +
s" image : $image\n" +
s" volumes : ${jobDirectory.toAbsolutePath}"
)
if (containerResponse.getWarnings.nonEmpty) logger.warn(s"${containerResponse.getWarnings.mkString(", ")}")
scheduleContainerTimeout(containerResponse.getId, timeout)

containerResponse.getId
}

private def configure(jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path): HostConfig = {
val hostConfigMut = HostConfig
.newHostConfig()
.withBinds(
Seq(
new Bind(
dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString,
new Volume(s"/job"),
AccessMode.rw
)
): _*
)

config.getOptional[Seq[String]]("docker.container.capAdd").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapAdd(_: _*))
config.getOptional[Seq[String]]("docker.container.capDrop").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapDrop(_: _*))
config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigMut.withCgroupParent)
config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigMut.withCpuPeriod(_))
config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigMut.withCpuQuota(_))
config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigMut.withDns)
config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigMut.withDnsSearch)
config.getOptional[Seq[String]]("docker.container.extraHosts").foreach(l => hostConfigMut.withExtraHosts(l: _*))
config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigMut.withKernelMemory(_))
config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigMut.withMemoryReservation(_))
config.getOptional[Long]("docker.container.memory").foreach(hostConfigMut.withMemory(_))
config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigMut.withMemorySwap(_))
config.getOptional[Long]("docker.container.memorySwappiness").foreach(hostConfigMut.withMemorySwappiness(_))
config.getOptional[String]("docker.container.networkMode").foreach(hostConfigMut.withNetworkMode)
config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigMut.withPrivileged(_))

hostConfigMut
}

def info: Info = underlyingClient.infoCmd().exec()
def pullImage(image: String): Boolean = blocking {
val pullImageResultCbk = underlyingClient // Blocking
.pullImageCmd(image)
.start()
.awaitCompletion()
val timeout = config.get[FiniteDuration]("docker.pullImageTimeout")

pullImageResultCbk.awaitCompletion(timeout.toMillis, TimeUnit.MILLISECONDS)
}

def clean(containerId: String): Try[Unit] = Try {
underlyingClient
.killContainerCmd(containerId)
.exec()
underlyingClient
.removeContainerCmd(containerId)
.withForce(true)
.exec()
logger.info(s"removed container $containerId")
}

def getLogs(containerId: String): Try[String] = Try {
val stringBuilder = new StringBuilder()
val callback = new DockerLogsStringBuilder(stringBuilder)
underlyingClient
.logContainerCmd(containerId)
.withStdErr(true)
.withStdOut(true)
.withFollowStream(true)
.withTailAll()
.exec(callback)
.awaitCompletion()

callback.builder.toString
}

private def scheduleContainerTimeout(containerId: String, timeout: FiniteDuration) =
Executors
.newSingleThreadScheduledExecutor()
.schedule(
() => {
logger.info(s"timeout $timeout reached, stopping container $containerId}")
underlyingClient.removeContainerCmd(containerId).withForce(true).exec()
},
timeout.length,
timeout.unit
)

private def getHttpClient: (DockerClientConfig, DockerHttpClient) = {
val dockerConf = getBaseConfig
val dockerClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(dockerConf.getDockerHost)
.sslConfig(dockerConf.getSSLConfig)
.maxConnections(if (config.has("docker.httpClient.maxConnections")) config.get[Int]("docker.httpClient.maxConnections") else 100)
.connectionTimeout(
if (config.has("docker.httpClient.connectionTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.connectionTimeout"))
else Duration.ofSeconds(30)
)
.responseTimeout(
if (config.has("docker.httpClient.responseTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.responseTimeout"))
else Duration.ofSeconds(45)
)
.build()

(dockerConf, dockerClient)
}

private def getBaseConfig: DockerClientConfig = {
val confBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder()
config.getOptional[String]("docker.host").foreach(confBuilder.withDockerHost)
config.getOptional[Boolean]("docker.tlsVerify").foreach(confBuilder.withDockerTlsVerify(_))
config.getOptional[String]("docker.certPath").foreach(confBuilder.withDockerCertPath)
config.getOptional[String]("docker.registry.user").foreach(confBuilder.withRegistryUsername)
config.getOptional[String]("docker.registry.password").foreach(confBuilder.withRegistryPassword)
config.getOptional[String]("docker.registry.email").foreach(confBuilder.withRegistryEmail)
config.getOptional[String]("docker.registry.url").foreach(confBuilder.withRegistryUrl)

confBuilder.build()
}
}
11 changes: 11 additions & 0 deletions app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.thp.cortex.util.docker

import com.github.dockerjava.api.async.ResultCallback
import com.github.dockerjava.api.model.Frame

class DockerLogsStringBuilder(var builder: StringBuilder) extends ResultCallback.Adapter[Frame] {
override def onNext(item: Frame): Unit = {
builder.append(new String(item.getPayload))
super.onNext(item)
}
}
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ lazy val cortex = (project in file("."))
Dependencies.reflections,
Dependencies.zip4j,
Dependencies.dockerClient,
Dependencies.dockerJavaClient,
Dependencies.dockerJavaTransport,
Dependencies.akkaCluster,
Dependencies.akkaClusterTyped
),
Expand Down
39 changes: 39 additions & 0 deletions conf/application.sample
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,43 @@ responder {
# port = 3128
# }

# Docker
docker {
host = "tcp://docker.somewhere.tld:2376"
tlsVerify = false
certPath = "/home/user/.docker"
registry {
user = "username"
password = "pwdReg"
email = "[email protected]"
url = "https://www.docker-registry.com"
}
httpClient {
maxConnections = 100
connectionTimeout = 30000 # millis
responseTimeout = 45000
}
container {
capAdd = ["ALL"]
capDrop = ["NET_ADMIN", "SYS_ADMIN"]
cgroupParent = "m-executor-abcd"
privileged = false

dns = ["8.8.8.8", "9.9.9.9"]
dnsSearch = ["dc1.example.com", "dc2.example.com"]
extraHosts = ["somehost=162.242.195.82", "otherhost=50.31.209.229", "myhostv6=::1"]
networkMode = "host"

cpuPeriod = 100000
cpuQuota = 50000
kernelMemory = 2147483648
memoryReservation = 1024
memory = 4294967296
memorySwap = 1073741824
memorySwappiness = 0
}
autoUpdate = false
pullImageTimeout = 10 minutes
}

# It's the end my friend. Happy hunting!
Loading

0 comments on commit e0b67f9

Please sign in to comment.