diff --git a/rider/rider-server/src/main/scala/edp/rider/common/RiderEnumDefinition.scala b/rider/rider-server/src/main/scala/edp/rider/common/RiderEnumDefinition.scala index 7100f8b7b..91974129b 100644 --- a/rider/rider-server/src/main/scala/edp/rider/common/RiderEnumDefinition.scala +++ b/rider/rider-server/src/main/scala/edp/rider/common/RiderEnumDefinition.scala @@ -160,6 +160,7 @@ object FunctionType extends Enumeration { val DEFAULT = Value("default") val HDFSLOG = Value("hdfslog") val ROUTIING = Value("routing") + val HDFSCSV = Value("hdfscsv") def functionType(s: String) = FunctionType.withName(s.toLowerCase) diff --git a/rider/rider-server/src/main/scala/edp/rider/kafka/WormholeGetOffsetUtils.scala b/rider/rider-server/src/main/scala/edp/rider/kafka/WormholeGetOffsetUtils.scala index b85538f7d..503561254 100644 --- a/rider/rider-server/src/main/scala/edp/rider/kafka/WormholeGetOffsetUtils.scala +++ b/rider/rider-server/src/main/scala/edp/rider/kafka/WormholeGetOffsetUtils.scala @@ -32,7 +32,7 @@ object WormholeGetOffsetUtils extends Serializable{ try{ edp.wormhole.kafka.WormholeGetOffsetUtils.getConsumerOffset(brokers,groupId,topic,partitions,kerberos) }catch { - case ex: Throwable => + case _ => currentVersion=KafkaVersion.KAFKA_UNKOWN throw ex } diff --git a/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala b/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala index 08f95b116..a795e2a36 100644 --- a/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala +++ b/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala @@ -389,7 +389,7 @@ object FlowUtils extends RiderLogger { case StreamType.FLINK => s"${Action.RENEW},${Action.BATCHSELECT},${Action.DRIFT}" case StreamType.SPARK => FunctionType.withName(functionType) match { - case FunctionType.HDFSLOG | FunctionType.ROUTIING => + case FunctionType.HDFSLOG | FunctionType.ROUTIING | FunctionType.HDFSCSV => s"${Action.DRIFT}" case FunctionType.DEFAULT => "" } @@ -680,7 +680,105 @@ object FlowUtils extends RiderLogger { } send flow $flowId start directive: $flow_start_ums") PushDirective.sendHdfsLogFlowStartDirective(flowId, streamId, sourceNs, jsonCompact(flow_start_ums)) // riderLogger.info(s"user ${directive.createBy} send ${DIRECTIVE_HDFSLOG_FLOW_START.toString} directive to ${RiderConfig.zk.address} success.") - } else if (functionType == "routing") { + } else if(functionType == "hdfscsv"){ + val base64Tuple = Seq(streamId, flowId, currentMillSec, sourceNs, "24", umsType, + base64byte2s(umsSchema.toString.trim.getBytes), sourceIncrementTopic, + if (flowOpt.nonEmpty) flowOpt.get.priorityId else 0L) + val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_HDFSCSV_FLOW_START.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut) + // riderLogger.info(s"user ${directive.createBy} insert ${DIRECTIVE_HDFSLOG_FLOW_START.toString} success.") + val flow_start_ums = + s""" + |{ + |"protocol": { + |"type": "${DIRECTIVE_HDFSCSV_FLOW_START.toString}" + |}, + |"schema": { + |"namespace": "$sourceNs", + |"fields": [ + |{ + |"name": "directive_id", + |"type": "long", + |"nullable": false + |}, + |{ + |"name": "stream_id", + |"type": "long", + |"nullable": false + |}, + |{ + |"name": "flow_id", + |"type": "long", + |"nullable": false + |}, + |{ + |"name": "ums_ts_", + |"type": "datetime", + |"nullable": false + |}, + |{ + |"name": "namespace_rule", + |"type": "string", + |"nullable": false + |}, + |{ + |"name": "hour_duration", + |"type": "string", + |"nullable": false + |}, + |{ + |"name": "data_type", + |"type": "string", + |"nullable": false + |}, + |{ + |"name": "data_parse", + |"type": "string", + |"nullable": true + |}, + |{ + |"name": "source_increment_topic", + |"type": "string", + |"nullable": true + |}, + |{ + |"name": "priority_id", + |"type": "long", + |"nullable": true + |} + |] + |}, + |"payload": [ + |{ + |"tuple": [${ + directive.id + }, ${ + base64Tuple.head + }, ${ + base64Tuple(1) + }, "${ + base64Tuple(2) + }", "${ + base64Tuple(3) + }", "${ + base64Tuple(4) + }", "${ + base64Tuple(5) + }", "${ + base64Tuple(6) + }", "${ + base64Tuple(7) + }", "${ + base64Tuple(8) + }"] + |} + |] + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\r", "") + riderLogger.info(s"user ${ + directive.createBy + } send flow $flowId start directive: $flow_start_ums") + PushDirective.sendHdfsCsvStartDirective(flowId, streamId, sourceNs, jsonCompact(flow_start_ums)) + }else if (functionType == "routing") { val (instance, db, _) = namespaceDal.getNsDetail(sinkNs) val tuple = Seq(streamId, flowId, currentMillSec, umsType, sinkNs, instance.connUrl, db.nsDatabase, sourceIncrementTopic, if (flowOpt.nonEmpty) flowOpt.get.priorityId else 0L) val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_ROUTER_FLOW_START.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut) @@ -788,6 +886,13 @@ object FlowUtils extends RiderLogger { directive.createBy } send flow $flowId stop directive") PushDirective.sendHdfsLogFlowStopDirective(flowId, streamId, sourceNs) + } else if (functionType == "hdfscsv") { + val tuple = Seq(streamId, currentMillSec, sourceNs).mkString(",") + val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_HDFSCSV_FLOW_STOP.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut) + riderLogger.info(s"user ${ + directive.createBy + } send flow $flowId stop directive") + PushDirective.sendHdfsCsvFlowStopDirective(flowId, streamId, sourceNs) } else if (functionType == "routing") { val tuple = Seq(streamId, currentMillSec, sourceNs).mkString(",") val directive = Await.result(directiveDal.insert(Directive(0, DIRECTIVE_ROUTER_FLOW_STOP.toString, streamId, flowId, "", RiderConfig.zk.address, currentSec, userId)), minTimeOut) diff --git a/rider/rider-server/src/main/scala/edp/rider/zookeeper/PushDirective.scala b/rider/rider-server/src/main/scala/edp/rider/zookeeper/PushDirective.scala index 217d585d6..aab8b5c3b 100644 --- a/rider/rider-server/src/main/scala/edp/rider/zookeeper/PushDirective.scala +++ b/rider/rider-server/src/main/scala/edp/rider/zookeeper/PushDirective.scala @@ -59,6 +59,16 @@ object PushDirective extends RiderLogger { deleteData(zkUrl, path) } + def sendHdfsCsvStartDirective(flowId: Long, streamId: Long, sourceNamespace: String, flowStartJson: String, zkUrl: String = RiderConfig.zk.address): Boolean = { + val path = s"${RiderConfig.zk.path}/$streamId$flowDir/hdfscsv->$flowId->$sourceNamespace->$sourceNamespace" + setDataToPath(zkUrl, path, flowStartJson) + } + + def sendHdfsCsvFlowStopDirective(flowId: Long, streamId: Long, sourceNamespace: String, zkUrl: String = RiderConfig.zk.address): Unit = { + val path = s"${RiderConfig.zk.path}/$streamId$flowDir/hdfscsv->$flowId->$sourceNamespace->$sourceNamespace" + deleteData(zkUrl, path) + } + def sendRouterFlowStartDirective(flowId: Long, streamId: Long, sourceNamespace: String, sinkNamespace: String, flowStartJson: String, zkUrl: String = RiderConfig.zk.address): Boolean = { val path = s"${RiderConfig.zk.path}/$streamId$flowDir/router->$flowId->$sourceNamespace->$sinkNamespace" setDataToPath(zkUrl, path, flowStartJson) diff --git a/sinks/src/main/scala/edp/wormhole/sinks/dbsink/SplitTableSqlProcessor.scala b/sinks/src/main/scala/edp/wormhole/sinks/dbsink/SplitTableSqlProcessor.scala index 8360d8184..3aa6640b7 100644 --- a/sinks/src/main/scala/edp/wormhole/sinks/dbsink/SplitTableSqlProcessor.scala +++ b/sinks/src/main/scala/edp/wormhole/sinks/dbsink/SplitTableSqlProcessor.scala @@ -342,6 +342,7 @@ class SplitTableSqlProcessor(sinkProcessConfig: SinkProcessConfig, schemaMap: co conn.setAutoCommit(false) logger.info(s"@write list.size:${tupleList.length} masterSql $masterSql") logger.info(s"@write list.size:${tupleList.length} subSql $subSql") + tupleList.foreach(tuples => { try { psMaster = conn.prepareStatement(masterSql)