Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/edp963/wormhole
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuwenli11 committed Jul 22, 2019
2 parents cfd899f + 66db021 commit 000d9a2
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ object FunctionType extends Enumeration {
val DEFAULT = Value("default")
val HDFSLOG = Value("hdfslog")
val ROUTIING = Value("routing")
val HDFSCSV = Value("hdfscsv")

def functionType(s: String) = FunctionType.withName(s.toLowerCase)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object WormholeGetOffsetUtils extends Serializable{
try{
edp.wormhole.kafka.WormholeGetOffsetUtils.getConsumerOffset(brokers,groupId,topic,partitions,kerberos)
}catch {
case ex: Throwable =>
case _ =>
currentVersion=KafkaVersion.KAFKA_UNKOWN
throw ex
}
Expand Down
109 changes: 107 additions & 2 deletions rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ object FlowUtils extends RiderLogger {
case StreamType.FLINK => s"${Action.RENEW},${Action.BATCHSELECT},${Action.DRIFT}"
case StreamType.SPARK =>
FunctionType.withName(functionType) match {
case FunctionType.HDFSLOG | FunctionType.ROUTIING =>
case FunctionType.HDFSLOG | FunctionType.ROUTIING | FunctionType.HDFSCSV =>
s"${Action.DRIFT}"
case FunctionType.DEFAULT => ""
}
Expand Down Expand Up @@ -680,7 +680,105 @@ object FlowUtils extends RiderLogger {
} send flow $flowId start directive: $flow_start_ums")
PushDirective.sendHdfsLogFlowStartDirective(flowId, streamId, sourceNs, jsonCompact(flow_start_ums))
// riderLogger.info(s"user ${directive.createBy} send ${DIRECTIVE_HDFSLOG_FLOW_START.toString} directive to ${RiderConfig.zk.address} success.")
} else if (functionType == "routing") {
} else if(functionType == "hdfscsv"){
val base64Tuple = Seq(streamId, flowId, currentMillSec, sourceNs, "24", umsType,
base64byte2s(umsSchema.toString.trim.getBytes), sourceIncrementTopic,
if (flowOpt.nonEmpty) flowOpt.get.priorityId else 0L)
val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_HDFSCSV_FLOW_START.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut)
// riderLogger.info(s"user ${directive.createBy} insert ${DIRECTIVE_HDFSLOG_FLOW_START.toString} success.")
val flow_start_ums =
s"""
|{
|"protocol": {
|"type": "${DIRECTIVE_HDFSCSV_FLOW_START.toString}"
|},
|"schema": {
|"namespace": "$sourceNs",
|"fields": [
|{
|"name": "directive_id",
|"type": "long",
|"nullable": false
|},
|{
|"name": "stream_id",
|"type": "long",
|"nullable": false
|},
|{
|"name": "flow_id",
|"type": "long",
|"nullable": false
|},
|{
|"name": "ums_ts_",
|"type": "datetime",
|"nullable": false
|},
|{
|"name": "namespace_rule",
|"type": "string",
|"nullable": false
|},
|{
|"name": "hour_duration",
|"type": "string",
|"nullable": false
|},
|{
|"name": "data_type",
|"type": "string",
|"nullable": false
|},
|{
|"name": "data_parse",
|"type": "string",
|"nullable": true
|},
|{
|"name": "source_increment_topic",
|"type": "string",
|"nullable": true
|},
|{
|"name": "priority_id",
|"type": "long",
|"nullable": true
|}
|]
|},
|"payload": [
|{
|"tuple": [${
directive.id
}, ${
base64Tuple.head
}, ${
base64Tuple(1)
}, "${
base64Tuple(2)
}", "${
base64Tuple(3)
}", "${
base64Tuple(4)
}", "${
base64Tuple(5)
}", "${
base64Tuple(6)
}", "${
base64Tuple(7)
}", "${
base64Tuple(8)
}"]
|}
|]
|}
""".stripMargin.replaceAll("\n", "").replaceAll("\r", "")
riderLogger.info(s"user ${
directive.createBy
} send flow $flowId start directive: $flow_start_ums")
PushDirective.sendHdfsCsvStartDirective(flowId, streamId, sourceNs, jsonCompact(flow_start_ums))
}else if (functionType == "routing") {
val (instance, db, _) = namespaceDal.getNsDetail(sinkNs)
val tuple = Seq(streamId, flowId, currentMillSec, umsType, sinkNs, instance.connUrl, db.nsDatabase, sourceIncrementTopic, if (flowOpt.nonEmpty) flowOpt.get.priorityId else 0L)
val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_ROUTER_FLOW_START.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut)
Expand Down Expand Up @@ -788,6 +886,13 @@ object FlowUtils extends RiderLogger {
directive.createBy
} send flow $flowId stop directive")
PushDirective.sendHdfsLogFlowStopDirective(flowId, streamId, sourceNs)
} else if (functionType == "hdfscsv") {
val tuple = Seq(streamId, currentMillSec, sourceNs).mkString(",")
val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_HDFSCSV_FLOW_STOP.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut)
riderLogger.info(s"user ${
directive.createBy
} send flow $flowId stop directive")
PushDirective.sendHdfsCsvFlowStopDirective(flowId, streamId, sourceNs)
} else if (functionType == "routing") {
val tuple = Seq(streamId, currentMillSec, sourceNs).mkString(",")
val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_ROUTER_FLOW_STOP.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ object PushDirective extends RiderLogger {
deleteData(zkUrl, path)
}

def sendHdfsCsvStartDirective(flowId: Long, streamId: Long, sourceNamespace: String, flowStartJson: String, zkUrl: String = RiderConfig.zk.address): Boolean = {
val path = s"${RiderConfig.zk.path}/$streamId$flowDir/hdfscsv->$flowId->$sourceNamespace->$sourceNamespace"
setDataToPath(zkUrl, path, flowStartJson)
}

def sendHdfsCsvFlowStopDirective(flowId: Long, streamId: Long, sourceNamespace: String, zkUrl: String = RiderConfig.zk.address): Unit = {
val path = s"${RiderConfig.zk.path}/$streamId$flowDir/hdfscsv->$flowId->$sourceNamespace->$sourceNamespace"
deleteData(zkUrl, path)
}

def sendRouterFlowStartDirective(flowId: Long, streamId: Long, sourceNamespace: String, sinkNamespace: String, flowStartJson: String, zkUrl: String = RiderConfig.zk.address): Boolean = {
val path = s"${RiderConfig.zk.path}/$streamId$flowDir/router->$flowId->$sourceNamespace->$sinkNamespace"
setDataToPath(zkUrl, path, flowStartJson)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ class SplitTableSqlProcessor(sinkProcessConfig: SinkProcessConfig, schemaMap: co
conn.setAutoCommit(false)
logger.info(s"@write list.size:${tupleList.length} masterSql $masterSql")
logger.info(s"@write list.size:${tupleList.length} subSql $subSql")

tupleList.foreach(tuples => {
try {
psMaster = conn.prepareStatement(masterSql)
Expand Down

0 comments on commit 000d9a2

Please sign in to comment.