From 7872ca8635b5e60d06c87fbbd21e88649047ab5f Mon Sep 17 00:00:00 2001
From: 54liuyao <54liuyao@163.com>
Date: Thu, 26 Dec 2024 14:33:14 +0800
Subject: [PATCH 1/4] add stream event notify
---
include/common/tcommon.h | 1 +
source/libs/executor/inc/executorInt.h | 6 +-
source/libs/executor/inc/streamexecutorInt.h | 3 +-
.../executor/src/streameventwindowoperator.c | 56 ++++++++++++++++++-
source/libs/executor/src/streamexecutorInt.c | 15 ++++-
.../src/streamintervalsliceoperator.c | 3 +-
.../executor/src/streamtimesliceoperator.c | 3 +-
7 files changed, 79 insertions(+), 8 deletions(-)
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 61cd482c704b..7264718f752f 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -156,6 +156,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
+ STREAM_EVENT_OPEN_WINDOW,
} EStreamType;
#pragma pack(push, 1)
diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index 48afa78251b7..04e7884020ea 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -450,8 +450,10 @@ typedef struct STimeWindowAggSupp {
} STimeWindowAggSupp;
typedef struct SSteamOpBasicInfo {
- int32_t primaryPkIndex;
- bool updateOperatorInfo;
+ int32_t primaryPkIndex;
+ bool updateOperatorInfo;
+ SSDataBlock* pEventRes;
+ SArray* pEventInfo;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h
index 0a6908031412..0c0ea0d6fcc6 100644
--- a/source/libs/executor/inc/streamexecutorInt.h
+++ b/source/libs/executor/inc/streamexecutorInt.h
@@ -57,7 +57,8 @@ typedef struct SSlicePoint {
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
-void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
+int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
+void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption);
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins);
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index fa6008eba7f6..a9a47580dc44 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
+#include "cmdnodes.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
@@ -53,6 +55,8 @@ void destroyStreamEventOperatorInfo(void* param) {
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
+
+ destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@@ -121,7 +125,7 @@ void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
}
int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd,
- int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
+ int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
@@ -143,6 +147,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) {
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
+ (*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@@ -156,6 +161,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
+ (*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@@ -163,6 +169,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
QUERY_CHECK_CODE(code, lino, _error);
+ (*pWinCode) = TSDB_CODE_FAILED;
setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
pCurWin->pWinFlag->startFlag = start;
@@ -303,6 +310,14 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
+static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
+ void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
+ if (pRes != NULL) {
+ return TSDB_CODE_SUCCESS;
+ }
+ return terrno;
+}
+
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@@ -373,10 +388,16 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
bool allEqual = true;
SEventWindowInfo curWin = {0};
SSessionKey nextWinKey = {0};
+ int32_t winCode = TSDB_CODE_SUCCESS;
code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
- &nextWinKey);
+ &nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
+ code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
@@ -561,12 +582,42 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
+static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ blockDataCleanup(pBasicInfo->pEventRes);
+ int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
+ code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
+ QUERY_CHECK_CODE(code, lino, _end);
+ for (int32_t i = 0; i < size; i++) {
+ SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
+ uint64_t uid = 0;
+ code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ taosArrayClear(pBasicInfo->pEventInfo);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
+ }
+}
+
+
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ buildEventNotifyResult(&pInfo->basic);
+ if (pInfo->basic.pEventRes->info.rows > 0) {
+ printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ (*ppRes) = pInfo->basic.pEventRes;
+ return code;
+ }
+
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@@ -957,6 +1008,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey;
+ initStreamBasicInfo(&pInfo->basic);
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c
index b94798934c52..1e7fbfa44658 100644
--- a/source/libs/executor/src/streamexecutorInt.c
+++ b/source/libs/executor/src/streamexecutorInt.c
@@ -14,6 +14,7 @@
*/
#include "executorInt.h"
+#include "tdatablock.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
@@ -29,7 +30,19 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
-void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
+int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
+ pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
+ if (pBasicInfo->pEventInfo == NULL) {
+ return terrno;
+ }
+ return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
+}
+
+void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
+ blockDataDestroy(pBasicInfo->pEventRes);
+ pBasicInfo->pEventRes = NULL;
+ taosArrayDestroy(pBasicInfo->pEventInfo);
+ pBasicInfo->pEventInfo = NULL;
}
diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c
index d038e4d82c85..45707e670eee 100644
--- a/source/libs/executor/src/streamintervalsliceoperator.c
+++ b/source/libs/executor/src/streamintervalsliceoperator.c
@@ -651,7 +651,8 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
- initStreamBasicInfo(&pInfo->basic);
+ code = initStreamBasicInfo(&pInfo->basic);
+ QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c
index 44004a4c6bc0..9ec6063486df 100644
--- a/source/libs/executor/src/streamtimesliceoperator.c
+++ b/source/libs/executor/src/streamtimesliceoperator.c
@@ -2201,7 +2201,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
- initStreamBasicInfo(&pInfo->basic);
+ code = initStreamBasicInfo(&pInfo->basic);
+ QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);
From ff86a306e5d56654b3c32d3ed1bad95e2ea30241 Mon Sep 17 00:00:00 2001
From: Jinqing Kuang
Date: Sat, 4 Jan 2025 14:59:47 +0800
Subject: [PATCH 2/4] feat(stream)[TS-5469]. add syntax for create stream with
notification
---
include/common/tmsg.h | 6 ++
include/libs/executor/executor.h | 2 +
include/libs/nodes/cmdnodes.h | 49 ++++++---
include/libs/stream/tstream.h | 17 ++-
include/util/tdef.h | 1 +
source/common/src/msg/tmsg.c | 35 ++++++
source/dnode/mnode/impl/src/mndStream.c | 72 +++++++++++++
source/dnode/vnode/src/tq/tqSink.c | 2 +-
source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +-
source/libs/executor/inc/querytask.h | 1 +
source/libs/executor/src/executor.c | 7 ++
source/libs/nodes/src/nodesCodeFuncs.c | 52 +++++++++
source/libs/nodes/src/nodesUtilFuncs.c | 9 ++
source/libs/parser/inc/parAst.h | 6 +-
source/libs/parser/inc/sql.y | 22 +++-
source/libs/parser/src/parAstCreater.c | 117 ++++++++++++++++++++-
source/libs/parser/src/parTokenizer.c | 3 +
source/libs/parser/src/parTranslater.c | 43 ++++++++
source/libs/stream/src/streamDispatch.c | 2 +-
source/libs/stream/src/streamTask.c | 61 ++++++++++-
20 files changed, 486 insertions(+), 25 deletions(-)
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index aebe09b56344..507257c5fca3 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -269,6 +269,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
+ QUERY_NODE_STREAM_NOTIFY_OPTIONS,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
@@ -2956,6 +2957,11 @@ typedef struct {
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
+ // 3.3.5.0
+ SArray* pNotifyAddrUrls;
+ int32_t notifyEventTypes;
+ int32_t notifyErrorHandle;
+ int8_t notifyHistory;
} SCMCreateStreamReq;
typedef struct {
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 883c5f7b99cc..0607c65a3a6a 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -98,6 +98,8 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
+void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes);
+
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h
index 12d77bd0c24b..26482a87d4c7 100644
--- a/include/libs/nodes/cmdnodes.h
+++ b/include/libs/nodes/cmdnodes.h
@@ -566,19 +566,44 @@ typedef struct SStreamOptions {
int64_t setFlag;
} SStreamOptions;
+typedef enum EStreamNotifyOptionSetFlag {
+ SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0),
+ SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1),
+} EStreamNotifyOptionSetFlag;
+
+typedef enum EStreamNotifyEventType {
+ SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0),
+ SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1),
+} EStreamNotifyEventType;
+
+typedef enum EStreamNotifyErrorHandleType {
+ SNOTIFY_ERROR_HANDLE_PAUSE,
+ SNOTIFY_ERROR_HANDLE_DROP,
+} EStreamNotifyErrorHandleType;
+
+typedef struct SStreamNotifyOptions {
+ ENodeType type;
+ SNodeList* pAddrUrls;
+ EStreamNotifyEventType eventTypes;
+ EStreamNotifyErrorHandleType errorHandle;
+ bool notifyHistory;
+ EStreamNotifyOptionSetFlag setFlag;
+} SStreamNotifyOptions;
+
typedef struct SCreateStreamStmt {
- ENodeType type;
- char streamName[TSDB_TABLE_NAME_LEN];
- char targetDbName[TSDB_DB_NAME_LEN];
- char targetTabName[TSDB_TABLE_NAME_LEN];
- bool ignoreExists;
- SStreamOptions* pOptions;
- SNode* pQuery;
- SNode* pPrevQuery;
- SNodeList* pTags;
- SNode* pSubtable;
- SNodeList* pCols;
- SCMCreateStreamReq* pReq;
+ ENodeType type;
+ char streamName[TSDB_TABLE_NAME_LEN];
+ char targetDbName[TSDB_DB_NAME_LEN];
+ char targetTabName[TSDB_TABLE_NAME_LEN];
+ bool ignoreExists;
+ SStreamOptions* pOptions;
+ SNode* pQuery;
+ SNode* pPrevQuery;
+ SNodeList* pTags;
+ SNode* pSubtable;
+ SNodeList* pCols;
+ SStreamNotifyOptions* pNotifyOptions;
+ SCMCreateStreamReq* pReq;
} SCreateStreamStmt;
typedef struct SDropStreamStmt {
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index a4d89dcdcc84..f7addc7be37d 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -65,10 +65,12 @@ typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
-#define SSTREAM_TASK_VER 4
-#define SSTREAM_TASK_INCOMPATIBLE_VER 1
-#define SSTREAM_TASK_NEED_CONVERT_VER 2
-#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
+#define SSTREAM_TASK_VER 5
+#define SSTREAM_TASK_INCOMPATIBLE_VER 1
+#define SSTREAM_TASK_NEED_CONVERT_VER 2
+#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId
+#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
+#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close
extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
@@ -427,6 +429,12 @@ typedef struct STaskCheckInfo {
TdThreadMutex checkInfoLock;
} STaskCheckInfo;
+typedef struct SNotifyInfo {
+ SArray* pNotifyAddrUrls;
+ int32_t notifyEventTypes;
+ int32_t notifyErrorHandle;
+} SNotifyInfo;
+
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
@@ -449,6 +457,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
+ SNotifyInfo notifyInfo;
// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
diff --git a/include/util/tdef.h b/include/util/tdef.h
index 2b0aa00b1a50..ae999943fc22 100644
--- a/include/util/tdef.h
+++ b/include/util/tdef.h
@@ -245,6 +245,7 @@ typedef enum ELogicConditionType {
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
+#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0'
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c
index 71c780a1ac24..91aa2bf1d909 100644
--- a/source/common/src/msg/tmsg.c
+++ b/source/common/src/msg/tmsg.c
@@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId));
+
+ int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls);
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize));
+ for (int32_t i = 0; i < addrSize; ++i) {
+ const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
+ TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url)));
+ }
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle));
+ TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory));
tEndEncode(&encoder);
_exit:
@@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId));
}
+ if (!tDecodeIsEnd(&decoder)) {
+ int32_t addrSize = 0;
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize));
+ pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
+ if (pReq->pNotifyAddrUrls == NULL) {
+ TAOS_CHECK_EXIT(terrno);
+ }
+ for (int32_t i = 0; i < addrSize; ++i) {
+ char *url = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url));
+ url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
+ if (url == NULL) {
+ TAOS_CHECK_EXIT(terrno);
+ }
+ if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
+ taosMemoryFree(url);
+ TAOS_CHECK_EXIT(terrno);
+ }
+ }
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle));
+ TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory));
+ }
+
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
@@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
+ taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index caed5a668ba9..b95d5e315a6f 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -753,6 +753,70 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
return TSDB_CODE_SUCCESS;
}
+static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
+
+static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, SStreamTask *pTask) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
+ TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
+ pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
+ pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ int32_t level = 0;
+ int32_t nTasks = 0;
+ SArray *pLevel = NULL;
+
+ TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
+ goto _end;
+ }
+
+ level = taosArrayGetSize(pStream->tasks);
+ for (int32_t i = 0; i < level; ++i) {
+ pLevel = taosArrayGetP(pStream->tasks, i);
+ nTasks = taosArrayGetSize(pLevel);
+ for (int32_t j = 0; j < nTasks; ++j) {
+ code = addStreamTaskNotifyInfo(createReq, taosArrayGetP(pLevel, j));
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+ }
+
+ if (pStream->conf.fillHistory && createReq->notifyHistory) {
+ level = taosArrayGetSize(pStream->pHTasksList);
+ for (int32_t i = 0; i < level; ++i) {
+ pLevel = taosArrayGetP(pStream->pHTasksList, i);
+ nTasks = taosArrayGetSize(pLevel);
+ for (int32_t j = 0; j < nTasks; ++j) {
+ code = addStreamTaskNotifyInfo(createReq, taosArrayGetP(pLevel, j));
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+ }
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
+ }
+ return code;
+}
+
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@@ -850,6 +914,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
+ // add notify info into all stream tasks
+ code = addStreamNotifyInfo(&createReq, &streamObj);
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
+ mndTransDrop(pTrans);
+ goto _OVER;
+ }
+
// add stream to trans
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 5405ace89b18..219b6290e69d 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -983,7 +983,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
- } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
+ } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
}
if (code != TSDB_CODE_SUCCESS) {
diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c
index 06b7b33cd822..bdccf954e094 100644
--- a/source/dnode/vnode/src/tqCommon/tqCommon.c
+++ b/source/dnode/vnode/src/tqCommon/tqCommon.c
@@ -86,6 +86,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
if (code) {
return code;
}
+
+ qSetStreamEventTypes(&pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes);
}
streamSetupScheduleTrigger(pTask);
@@ -1357,4 +1359,4 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h
index f726e4300fb4..14e82b45668d 100644
--- a/source/libs/executor/inc/querytask.h
+++ b/source/libs/executor/inc/querytask.h
@@ -71,6 +71,7 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
+ int32_t eventTypes;
} SStreamTaskInfo;
struct SExecTaskInfo {
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index dffab1b16323..c1e7263c9021 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -250,6 +250,13 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
+void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes) {
+ SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
+ if (pTaskInfo != NULL) {
+ pTaskInfo->streamInfo.eventTypes = eventTypes;
+ }
+}
+
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index bea9b9621574..bfe86aa2ac61 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -99,6 +99,8 @@ const char* nodesNodeName(ENodeType type) {
return "CountWindow";
case QUERY_NODE_ANOMALY_WINDOW:
return "AnomalyWindow";
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return "StreamNotifyOptions";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
@@ -5812,6 +5814,45 @@ static int32_t jsonToStreamOptions(const SJson* pJson, void* pObj) {
return code;
}
+static const char* jkStreamNotifyOptionsAddrUrls = "AddrUrls";
+static const char* jkStreamNotifyOptionsEventType = "EventType";
+static const char* jkStreamNotifyOptionsErrorHandle = "ErrorHandle";
+static const char* jkStreamNotifyOptionsNotifyHistory = "NotifyHistory";
+
+static int32_t streamNotifyOptionsToJson(const void* pObj, SJson* pJson) {
+ const SStreamNotifyOptions* pNotifyOption = (const SStreamNotifyOptions*)pObj;
+ int32_t code = nodeListToJson(pJson, jkStreamNotifyOptionsAddrUrls, pNotifyOption->pAddrUrls);
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsEventType, pNotifyOption->eventTypes);
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsErrorHandle, pNotifyOption->errorHandle);
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddBoolToObject(pJson, jkStreamNotifyOptionsNotifyHistory, pNotifyOption->notifyHistory);
+ }
+
+ return code;
+}
+
+static int32_t jsonToStreamNotifyOptions(const SJson* pJson, void* pObj) {
+ SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pObj;
+ int32_t code = jsonToNodeList(pJson, jkStreamNotifyOptionsAddrUrls, &pNotifyOption->pAddrUrls);
+ int32_t val = 0;
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsEventType, &val);
+ pNotifyOption->eventTypes = val;
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsErrorHandle, &val);
+ pNotifyOption->errorHandle = val;
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetBoolValue(pJson, jkStreamNotifyOptionsNotifyHistory, &pNotifyOption->notifyHistory);
+ }
+ return code;
+}
+
static const char* jkWhenThenWhen = "When";
static const char* jkWhenThenThen = "Then";
@@ -7207,6 +7248,7 @@ static const char* jkCreateStreamStmtOptions = "Options";
static const char* jkCreateStreamStmtQuery = "Query";
static const char* jkCreateStreamStmtTags = "Tags";
static const char* jkCreateStreamStmtSubtable = "Subtable";
+static const char* jkCreateStreamStmtNotifyOptions = "NotifyOptions";
static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
const SCreateStreamStmt* pNode = (const SCreateStreamStmt*)pObj;
@@ -7233,6 +7275,9 @@ static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateStreamStmtSubtable, nodeToJson, pNode->pSubtable);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkCreateStreamStmtNotifyOptions, nodeToJson, pNode->pNotifyOptions);
+ }
return code;
}
@@ -7262,6 +7307,9 @@ static int32_t jsonToCreateStreamStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateStreamStmtSubtable, &pNode->pSubtable);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkCreateStreamStmtNotifyOptions, (SNode**)&pNode->pNotifyOptions);
+ }
return code;
}
@@ -8029,6 +8077,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return countWindowNodeToJson(pObj, pJson);
case QUERY_NODE_ANOMALY_WINDOW:
return anomalyWindowNodeToJson(pObj, pJson);
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return streamNotifyOptionsToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
@@ -8402,6 +8452,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToCountWindowNode(pJson, pObj);
case QUERY_NODE_ANOMALY_WINDOW:
return jsonToAnomalyWindowNode(pJson, pObj);
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return jsonToStreamNotifyOptions(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
index 3d4df385f7cc..ae5b302d2da0 100644
--- a/source/libs/nodes/src/nodesUtilFuncs.c
+++ b/source/libs/nodes/src/nodesUtilFuncs.c
@@ -467,6 +467,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_WINDOW_OFFSET:
code = makeNode(type, sizeof(SWindowOffsetNode), &pNode);
break;
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ code = makeNode(type, sizeof(SStreamNotifyOptions), &pNode);
+ break;
case QUERY_NODE_SET_OPERATOR:
code = makeNode(type, sizeof(SSetOperator), &pNode);
break;
@@ -1267,6 +1270,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pAround->pTimepoint);
break;
}
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS: {
+ SStreamNotifyOptions* pNotifyOptions = (SStreamNotifyOptions*)pNode;
+ nodesDestroyList(pNotifyOptions->pAddrUrls);
+ break;
+ }
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
@@ -1479,6 +1487,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pQuery);
nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable);
+ nodesDestroyNode((SNode*)pStmt->pNotifyOptions);
tFreeSCMCreateStreamReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
break;
diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h
index dc9986ad04c8..387bccf358fb 100644
--- a/source/libs/parser/inc/parAst.h
+++ b/source/libs/parser/inc/parAst.h
@@ -296,8 +296,12 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
+SNode* createStreamNotifyOptions(SAstCreateContext *pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes);
+SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
+ SToken* pToken);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
- SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
+ SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
+ SNode* pNotifyOptions);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName);
diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y
index 9f81a152454a..79f5433482ab 100644
--- a/source/libs/parser/inc/sql.y
+++ b/source/libs/parser/inc/sql.y
@@ -775,7 +775,7 @@ full_view_name(A) ::= db_name(B) NK_DOT view_name(C).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
- AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
+ AS query_or_subquery(D) notify_opt(I). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H, I); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); }
cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); }
@@ -822,6 +822,26 @@ subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP.
ignore_opt(A) ::= . { A = false; }
ignore_opt(A) ::= IGNORE UNTREATED. { A = true; }
+notify_opt(A) ::= . { A = NULL; }
+notify_opt(A) ::= notify_def(B). { A = B; }
+
+notify_def(A) ::= NOTIFY NK_LP url_def_list(B) NK_RP ON NK_LP event_def_list(C) NK_RP. { A = createStreamNotifyOptions(pCxt, B, C); }
+notify_def(A) ::= notify_def(B) ON_FAILURE DROP(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
+notify_def(A) ::= notify_def(B) ON_FAILURE PAUSE(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
+notify_def(A) ::= notify_def(B) NOTIFY_HISTORY NK_INTEGER(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_NOTIFY_HISTORY_SET, &C); }
+
+%type url_def_list { SNodeList* }
+%destructor url_def_list { nodesDestroyList($$); }
+url_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
+url_def_list(A) ::= url_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
+
+%type event_def_list { SNodeList* }
+%destructor event_def_list { nodesDestroyList($$); }
+event_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
+event_def_list(A) ::= event_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
+
+
+
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); }
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index 5b90fd601ebd..0e98a899688c 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -1526,8 +1526,8 @@ SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhe
pCaseWhen->pCase = pCase;
pCaseWhen->pWhenThenList = pWhenThenList;
pCaseWhen->pElse = pElse;
- pCaseWhen->tz = pCxt->pQueryCxt->timezone;
- pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
+ pCaseWhen->tz = pCxt->pQueryCxt->timezone;
+ pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
return (SNode*)pCaseWhen;
_err:
nodesDestroyNode(pCase);
@@ -3657,8 +3657,117 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions
return pOptions;
}
+static bool validateHttpUrl(const char* url) {
+ const char *httpPrefix = "http://";
+ const char *httpsPrefix = "https://";
+ const char* host = NULL;
+
+ if (!url || *url == '\0') return false;
+
+ if (strncasecmp(url, httpPrefix, strlen(httpPrefix)) == 0) {
+ host = url + strlen(httpPrefix);
+ } else if (strncasecmp(url, httpsPrefix, strlen(httpsPrefix)) == 0) {
+ host = url + strlen(httpsPrefix);
+ } else {
+ return false;
+ }
+
+ return (*host != '\0') && (*host != '/');
+}
+
+SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes) {
+ SNode* pNode = NULL;
+ EStreamNotifyEventType eventTypes = 0;
+ const char* eWindowOpenStr = "WINDOW_OPEN";
+ const char* eWindowCloseStr = "WINDOW_CLOSE";
+
+ CHECK_PARSER_STATUS(pCxt);
+
+ if (LIST_LENGTH(pAddrUrls) == 0) {
+ pCxt->errCode =
+ generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "notification address cannot be empty");
+ goto _err;
+ }
+
+ FOREACH(pNode, pAddrUrls) {
+ char *url = ((SValueNode*)pNode)->literal;
+ if (strlen(url) >= TSDB_STREAM_NOTIFY_URL_LEN) {
+ pCxt->errCode =
+ generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "notification address \"%s\" exceed maximum length %d", url, TSDB_STREAM_NOTIFY_URL_LEN);
+ goto _err;
+ }
+ if (!validateHttpUrl(url)) {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "invalid notification address \"%s\"", url);
+ goto _err;
+ }
+ }
+
+ if (LIST_LENGTH(pEventTypes) == 0) {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "event types must be specified for notification");
+ goto _err;
+ }
+
+ FOREACH(pNode, pEventTypes) {
+ char *eventStr = ((SValueNode *)pNode)->literal;
+ if (strncasecmp(eventStr, eWindowOpenStr, strlen(eWindowOpenStr) + 1) == 0) {
+ BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_OPEN);
+ } else if (strncasecmp(eventStr, eWindowCloseStr, strlen(eWindowCloseStr) + 1) == 0) {
+ BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
+ } else {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "invalid event type '%s' for notification", eventStr);
+ goto _err;
+ }
+ }
+
+ SStreamNotifyOptions* pNotifyOptions = NULL;
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_STREAM_NOTIFY_OPTIONS, (SNode**)&pNotifyOptions);
+ CHECK_MAKE_NODE(pNotifyOptions);
+ pNotifyOptions->pAddrUrls = pAddrUrls;
+ pNotifyOptions->eventTypes = eventTypes;
+ pNotifyOptions->errorHandle = SNOTIFY_ERROR_HANDLE_PAUSE;
+ pNotifyOptions->notifyHistory = false;
+ nodesDestroyList(pEventTypes);
+ return (SNode*)pNotifyOptions;
+_err:
+ nodesDestroyList(pAddrUrls);
+ nodesDestroyList(pEventTypes);
+ return NULL;
+}
+
+SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
+ SToken* pToken) {
+ CHECK_PARSER_STATUS(pCxt);
+
+ SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pNode;
+ if (BIT_FLAG_TEST_MASK(pNotifyOption->setFlag, setFlag)) {
+ pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "stream notify options each item can only be set once");
+ goto _err;
+ }
+ switch (setFlag) {
+ case SNOTIFY_OPT_ERROR_HANDLE_SET:
+ pNotifyOption->errorHandle = (pToken->type == TK_DROP) ? SNOTIFY_ERROR_HANDLE_DROP : SNOTIFY_ERROR_HANDLE_PAUSE;
+ break;
+ case SNOTIFY_OPT_NOTIFY_HISTORY_SET:
+ pNotifyOption->notifyHistory = taosStr2Int8(pToken->z, NULL, 10);
+ break;
+ default:
+ break;
+ }
+ BIT_FLAG_SET_MASK(pNotifyOption->setFlag, setFlag);
+ return pNode;
+_err:
+ nodesDestroyNode(pNode);
+ return NULL;
+}
+
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
- SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
+ SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
+ SNode* pNotifyOptions) {
CHECK_PARSER_STATUS(pCxt);
CHECK_NAME(checkStreamName(pCxt, pStreamName));
SCreateStreamStmt* pStmt = NULL;
@@ -3674,6 +3783,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
pStmt->pTags = pTags;
pStmt->pSubtable = pSubtable;
pStmt->pCols = pCols;
+ pStmt->pNotifyOptions = (SStreamNotifyOptions*)pNotifyOptions;
return (SNode*)pStmt;
_err:
nodesDestroyNode(pRealTable);
@@ -3682,6 +3792,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
nodesDestroyList(pTags);
nodesDestroyNode(pSubtable);
nodesDestroyList(pCols);
+ nodesDestroyNode(pNotifyOptions);
return NULL;
}
diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c
index ea2e9d712fa0..7ed438a7dc81 100644
--- a/source/libs/parser/src/parTokenizer.c
+++ b/source/libs/parser/src/parTokenizer.c
@@ -355,6 +355,9 @@ static SKeyword keywordTable[] = {
{"FORCE_WINDOW_CLOSE", TK_FORCE_WINDOW_CLOSE},
{"DISK_INFO", TK_DISK_INFO},
{"AUTO", TK_AUTO},
+ {"NOTIFY", TK_NOTIFY},
+ {"ON_FAILURE", TK_ON_FAILURE},
+ {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
};
// clang-format on
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 93cf9cf8bfdd..b096f53339ca 100755
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -12160,6 +12160,45 @@ static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt
return TSDB_CODE_SUCCESS;
}
+static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOptions* pNotifyOptions,
+ SCMCreateStreamReq* pReq) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ SNode* pNode = NULL;
+
+ if (pNotifyOptions == NULL || pNotifyOptions->pAddrUrls->length == 0) {
+ return code;
+ }
+
+ pReq->pNotifyAddrUrls = taosArrayInit(pNotifyOptions->pAddrUrls->length, POINTER_BYTES);
+ if (pReq->pNotifyAddrUrls != NULL) {
+ FOREACH(pNode, pNotifyOptions->pAddrUrls) {
+ char *url = taosStrndup(((SValueNode*)pNode)->literal, TSDB_STREAM_NOTIFY_URL_LEN);
+ if (url == NULL) {
+ code = terrno;
+ break;
+ }
+ if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
+ code = terrno;
+ taosMemoryFreeClear(url);
+ break;
+ }
+ }
+ } else {
+ code = terrno;
+ }
+
+ if (code == TSDB_CODE_SUCCESS) {
+ pReq->notifyEventTypes = pNotifyOptions->eventTypes;
+ pReq->notifyErrorHandle = pNotifyOptions->errorHandle;
+ pReq->notifyHistory = pNotifyOptions->notifyHistory;
+ } else {
+ taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
+ pReq->pNotifyAddrUrls = NULL;
+ }
+
+ return code;
+}
+
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
@@ -12206,6 +12245,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
}
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq);
+ }
+
return code;
}
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 42d7f44b6243..baf36d04531a 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -735,7 +735,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId, sizeof(pDataBlock->info.parTbName));
- } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
+ } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER) {
code = buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName,
groupId, sizeof(pDataBlock->info.parTbName));
}
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index f46228fd47f2..4c493dfdffa4 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -322,6 +322,8 @@ void tFreeStreamTask(void* pParam) {
streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
pTask->chkInfo.pActiveInfo = NULL;
+ taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
+
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@@ -1306,6 +1308,59 @@ void streamTaskFreeRefId(int64_t* pRefId) {
metaRefMgtRemove(pRefId);
}
+static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+
+ int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
+ for (int32_t i = 0; i < addrSize; ++i) {
+ const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
+ TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
+ }
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
+
+_exit:
+ if (code != TSDB_CODE_SUCCESS) {
+ stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+
+ int32_t addrSize = 0;
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
+ info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
+ QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
+ for (int32_t i = 0; i < addrSize; ++i) {
+ char *url = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
+ url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
+ QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
+ if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
+ taosMemoryFree(url);
+ TAOS_CHECK_EXIT(terrno);
+ }
+ }
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
+
+_exit:
+ if (code != TSDB_CODE_SUCCESS) {
+ stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t code = 0;
@@ -1376,6 +1431,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
+ TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
+
tEndEncode(pEncoder);
_exit:
return code;
@@ -1474,8 +1531,10 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
+ TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
+
tEndDecode(pDecoder);
_exit:
return code;
-}
\ No newline at end of file
+}
From 575218e985846fd9d1b5215b466ed922b59e640a Mon Sep 17 00:00:00 2001
From: Jinqing Kuang
Date: Wed, 8 Jan 2025 18:51:24 +0800
Subject: [PATCH 3/4] feat(stream)[TS-5469]. make agg task generate notify
event
---
include/common/tcommon.h | 5 +-
include/common/tdatablock.h | 2 +
include/libs/executor/executor.h | 3 +-
include/libs/stream/tstream.h | 2 +
source/common/src/tdatablock.c | 27 +
source/dnode/vnode/src/tq/tqSink.c | 8 +-
source/dnode/vnode/src/tqCommon/tqCommon.c | 10 +-
source/libs/executor/inc/executorInt.h | 15 +-
source/libs/executor/inc/querytask.h | 5 +-
source/libs/executor/inc/streamexecutorInt.h | 10 +
source/libs/executor/src/executor.c | 23 +-
source/libs/executor/src/querytask.c | 2 +
.../executor/src/streameventwindowoperator.c | 84 +--
source/libs/executor/src/streamexecutorInt.c | 560 +++++++++++++++++-
source/libs/parser/src/parAstCreater.c | 20 +-
15 files changed, 702 insertions(+), 74 deletions(-)
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 7264718f752f..75e8aa82e288 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -156,7 +156,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
- STREAM_EVENT_OPEN_WINDOW,
+ STREAM_NOTIFY_EVENT,
} EStreamType;
#pragma pack(push, 1)
@@ -405,6 +405,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2
+// stream notify event block column
+#define NOTIFY_EVENT_STR_COLUMN_INDEX 0
+
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);
diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h
index 1103b89ccb04..96478047ca59 100644
--- a/include/common/tdatablock.h
+++ b/include/common/tdatablock.h
@@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
+int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
+ char** dstTableName);
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 0607c65a3a6a..9a7c3912b0d5 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -98,7 +98,8 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
-void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes);
+int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
+ const char* stbFullName, bool newSubTableRule);
/**
* Set multiple input data blocks for the stream scan.
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index f7addc7be37d..8d44e7487b7b 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -72,6 +72,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close
+#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
+
extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index bd18c9ceb94b..c3e0fff57849 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}
+int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
+ char** dstTableName) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ if (parTbName[0]) {
+ if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
+ stbFullName) {
+ *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
+ TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
+
+ tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
+ code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
+ TSDB_CHECK_CODE(code, lino, _end);
+ } else {
+ *dstTableName = taosStrdup(parTbName);
+ TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
+ }
+ } else {
+ code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+
+_end:
+ return code;
+}
+
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 219b6290e69d..7c085ea31a43 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -16,8 +16,6 @@
#include "tcommon.h"
#include "tq.h"
-#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
-
typedef struct STableSinkInfo {
uint64_t uid;
tstr name;
@@ -1175,6 +1173,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
} else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
+ } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
+ continue;
} else {
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
}
@@ -1319,6 +1319,10 @@ void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVno
continue;
}
+ if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
hasSubmit = true;
pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;
diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c
index bdccf954e094..e2ebd6e3cc5a 100644
--- a/source/dnode/vnode/src/tqCommon/tqCommon.c
+++ b/source/dnode/vnode/src/tqCommon/tqCommon.c
@@ -87,7 +87,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
return code;
}
- qSetStreamEventTypes(&pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes);
+ if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
+ code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
+ pTask->outputInfo.tbSink.pSchemaWrapper, pTask->outputInfo.tbSink.stbFullName,
+ IS_NEW_SUBTB_RULE(pTask));
+ if (code) {
+ tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
+ return code;
+ }
+ }
}
streamSetupScheduleTrigger(pTask);
diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index 04e7884020ea..fac73c4d5ace 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -449,11 +449,16 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
+typedef struct SStreamNotifyEventSupp {
+ SArray* pWindowEvents; // Array of SStreamNotifyEvent, storing window events and trigger values.
+ SArray* pWindowResults; // Array of SStreamNotifyEvent, storing window results.
+ SSDataBlock* pEventBlock; // The datablock contains all window events and results.
+} SStreamNotifyEventSupp;
+
typedef struct SSteamOpBasicInfo {
- int32_t primaryPkIndex;
- bool updateOperatorInfo;
- SSDataBlock* pEventRes;
- SArray* pEventInfo;
+ int32_t primaryPkIndex;
+ bool updateOperatorInfo;
+ SStreamNotifyEventSupp windowEventSup;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
@@ -769,6 +774,8 @@ typedef struct SStreamEventAggOperatorInfo {
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
+ SNodeList* pStartCondCols;
+ SNodeList* pEndCondCols;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {
diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h
index 14e82b45668d..86ee6f41245c 100644
--- a/source/libs/executor/inc/querytask.h
+++ b/source/libs/executor/inc/querytask.h
@@ -71,7 +71,10 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
- int32_t eventTypes;
+ int32_t eventTypes; // event types to notify
+ SSchemaWrapper* notifyResultSchema; // agg result to notify
+ char* stbFullName; // used to generate dest child table name
+ bool newSubTableRule; // used to generate dest child table name
} SStreamTaskInfo;
struct SExecTaskInfo {
diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h
index 0c0ea0d6fcc6..7b3c828351b6 100644
--- a/source/libs/executor/inc/streamexecutorInt.h
+++ b/source/libs/executor/inc/streamexecutorInt.h
@@ -19,7 +19,10 @@
extern "C" {
#endif
+#include "cJSON.h"
+#include "cmdnodes.h"
#include "executorInt.h"
+#include "querytask.h"
#include "tutil.h"
#define FILL_POS_INVALID 0
@@ -107,6 +110,13 @@ int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
+int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
+ const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
+ SStreamNotifyEventSupp* sup);
+int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
+ SStreamNotifyEventSupp* sup);
+int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index c1e7263c9021..10cae5cac611 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -250,11 +250,26 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
-void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes) {
- SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
- if (pTaskInfo != NULL) {
- pTaskInfo->streamInfo.eventTypes = eventTypes;
+int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
+ const char* stbFullName, bool newSubTableRule) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ SStreamTaskInfo *pStreamInfo = NULL;
+
+ if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) {
+ goto _end;
+ }
+
+ pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
+ pStreamInfo->eventTypes = eventTypes;
+ pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper);
+ if (pStreamInfo->notifyResultSchema == NULL) {
+ code = terrno;
}
+ pStreamInfo->stbFullName = taosStrdup(stbFullName);
+ pStreamInfo->newSubTableRule = newSubTableRule;
+
+_end:
+ return code;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c
index c6a1900b416a..20c80df4fa4a 100644
--- a/source/libs/executor/src/querytask.c
+++ b/source/libs/executor/src/querytask.c
@@ -262,6 +262,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSchemaWrapper(pStreamInfo->schema);
tOffsetDestroy(&pStreamInfo->currentOffset);
+ tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
+ taosMemoryFree(pStreamInfo->stbFullName);
}
static void freeBlock(void* pParam) {
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index a9a47580dc44..5f4d6b30fa89 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -93,6 +93,16 @@ void destroyStreamEventOperatorInfo(void* param) {
pInfo->pEndCondInfo = NULL;
}
+ if (pInfo->pStartCondCols != NULL) {
+ nodesDestroyList(pInfo->pStartCondCols);
+ pInfo->pStartCondCols = NULL;
+ }
+
+ if (pInfo->pEndCondCols != NULL) {
+ nodesDestroyList(pInfo->pEndCondCols);
+ pInfo->pEndCondCols = NULL;
+ }
+
taosMemoryFreeClear(param);
}
@@ -310,14 +320,6 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
-static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
- void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
- if (pRes != NULL) {
- return TSDB_CODE_SUCCESS;
- }
- return terrno;
-}
-
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@@ -393,8 +395,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
- if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
- code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) &&
+ *(bool*)colDataGetNumData(pColStart, i) && winCode != TSDB_CODE_SUCCESS) {
+ code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, pSDataBlock,
+ pInfo->pStartCondCols, i, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
@@ -464,6 +468,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
+
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
+ code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_CLOSE, &curWin.winInfo.sessionWin, pSDataBlock,
+ pInfo->pEndCondCols, i + winRows - 1, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
}
_end:
@@ -582,42 +592,13 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
-static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
- int32_t code = TSDB_CODE_SUCCESS;
- int32_t lino = 0;
-
- blockDataCleanup(pBasicInfo->pEventRes);
- int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
- code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
- QUERY_CHECK_CODE(code, lino, _end);
- for (int32_t i = 0; i < size; i++) {
- SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
- uint64_t uid = 0;
- code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
- QUERY_CHECK_CODE(code, lino, _end);
- }
- taosArrayClear(pBasicInfo->pEventInfo);
-
-_end:
- if (code != TSDB_CODE_SUCCESS) {
- qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
- }
-}
-
-
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
- buildEventNotifyResult(&pInfo->basic);
- if (pInfo->basic.pEventRes->info.rows > 0) {
- printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
- (*ppRes) = pInfo->basic.pEventRes;
- return code;
- }
-
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@@ -628,10 +609,27 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
+ code = addAggResultNotifyEvent(pBInfo->pRes, pTaskInfo->streamInfo.notifyResultSchema, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
(*ppRes) = pBInfo->pRes;
return code;
}
+
+ code = buildNotifyEventBlock(pTaskInfo, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ if (pInfo->basic.windowEventSup.pEventBlock->info.rows > 0) {
+ printDataBlock(pInfo->basic.windowEventSup.pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ (*ppRes) = pInfo->basic.windowEventSup.pEventBlock;
+ return code;
+ }
+
+_end:
(*ppRes) = NULL;
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
+ }
return code;
}
@@ -1041,6 +1039,12 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
+ code =
+ nodesCollectColumnsFromNode((SNode*)pEventNode->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pStartCondCols);
+ QUERY_CHECK_CODE(code, lino, _error);
+ code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols);
+ QUERY_CHECK_CODE(code, lino, _error);
+
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c
index 1e7fbfa44658..85cbdb08014b 100644
--- a/source/libs/executor/src/streamexecutorInt.c
+++ b/source/libs/executor/src/streamexecutorInt.c
@@ -13,9 +13,18 @@
* along with this program. If not, see .
*/
+#include "streamexecutorInt.h"
+
#include "executorInt.h"
#include "tdatablock.h"
+typedef struct SStreamNotifyEvent {
+ uint64_t gid;
+ uint64_t skey;
+ char* content;
+ bool isEnd;
+} SStreamNotifyEvent;
+
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
pBasicInfo->updateOperatorInfo = true;
@@ -30,19 +39,552 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
+static void destroyStreamWindowEvent(void* ptr) {
+ SStreamNotifyEvent* pEvent = ptr;
+ if (pEvent == NULL || pEvent->content == NULL) return;
+ cJSON_free(pEvent->content);
+}
+
+static void destroyStreamWindowEventSupp(SStreamNotifyEventSupp* sup) {
+ if (sup == NULL) return;
+ taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent);
+ taosArrayDestroyEx(sup->pWindowResults, destroyStreamWindowEvent);
+ blockDataDestroy(sup->pEventBlock);
+ *sup = (SStreamNotifyEventSupp){0};
+}
+
+static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SSDataBlock* pBlock = NULL;
+ SColumnInfoData infoData = {0};
+
+ if (sup == NULL) {
+ goto _end;
+ }
+
+ code = createDataBlock(&pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ pBlock->info.type = STREAM_NOTIFY_EVENT;
+ pBlock->info.watermark = INT64_MIN;
+
+ infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
+ infoData.info.bytes = tDataTypes[infoData.info.type].bytes;
+ code = blockDataAppendColInfo(pBlock, &infoData);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ sup->pWindowEvents = taosArrayInit(0, sizeof(SStreamNotifyEvent));
+ QUERY_CHECK_NULL(sup->pWindowEvents, code, lino, _end, terrno);
+ sup->pWindowResults = taosArrayInit(0, sizeof(SStreamNotifyEvent));
+ QUERY_CHECK_NULL(sup->pWindowResults, code, lino, _end, terrno);
+ sup->pEventBlock = pBlock;
+ pBlock = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ if (sup) {
+ destroyStreamWindowEventSupp(sup);
+ }
+ }
+ if (pBlock != NULL) {
+ blockDataDestroy(pBlock);
+ }
+ return code;
+}
+
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
- pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
- if (pBasicInfo->pEventInfo == NULL) {
- return terrno;
- }
- return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
+ return initStreamWindowEventSupp(&pBasicInfo->windowEventSup);
}
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
- blockDataDestroy(pBasicInfo->pEventRes);
- pBasicInfo->pEventRes = NULL;
- taosArrayDestroy(pBasicInfo->pEventInfo);
- pBasicInfo->pEventInfo = NULL;
+ destroyStreamWindowEventSupp(&pBasicInfo->windowEventSup);
+}
+
+// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance.
+static char* u64toaFastLut(uint64_t val, char* buf) {
+ static const char* lut =
+ "0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455"
+ "5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899";
+
+ char temp[24];
+ char* p = temp;
+
+ while (val >= 100) {
+ strncpy(p, lut + (val % 100) * 2, 2);
+ val /= 100;
+ p += 2;
+ }
+
+ if (val >= 10) {
+ strncpy(p, lut + val * 2, 2);
+ p += 2;
+ } else if (val > 0 || p == temp) {
+ *p++ = val + '0';
+ }
+
+ while (p != temp) {
+ *buf++ = *--p;
+ }
+
+ *buf = '\0';
+ return buf;
+}
+
+static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *buf) {
+ char* p = buf;
+ uint64_t hash = 0;
+
+ // The windowId is generated by computing a hash value using the concatenation of group ID and window start time, in
+ // the format: "_".
+ p = u64toaFastLut(pSessionKey->groupId, p);
+ *p++ = '_';
+ p = u64toaFastLut(pSessionKey->win.skey, p);
+ hash = MurmurHash3_64(buf, p - buf);
+
+ p = u64toaFastLut(hash, buf);
+}
+
+#define JSON_CHECK_ADD_ITEM(obj, str, item) \
+ QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+
+static int32_t jsonAddColumnField(const char* colName, const SColumnInfoData* pColData, int32_t ri, cJSON* obj) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ char* temp = NULL;
+
+ QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pColData, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ if (colDataIsNull_s(pColData, ri)) {
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull());
+ goto _end;
+ }
+
+ switch (pColData->info.type) {
+ case TSDB_DATA_TYPE_BOOL: {
+ bool val = *(bool*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_TINYINT: {
+ int8_t val = *(int8_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_SMALLINT: {
+ int16_t val = *(int16_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_INT: {
+ int32_t val = *(int32_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_BIGINT:
+ case TSDB_DATA_TYPE_TIMESTAMP: {
+ int64_t val = *(int64_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_FLOAT: {
+ float val = *(float*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_DOUBLE: {
+ double val = *(double*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_VARCHAR:
+ case TSDB_DATA_TYPE_NCHAR: {
+ // cJSON requires null-terminated strings, but this data is not null-terminated,
+ // so we need to manually copy the string and add null termination.
+ const char* src = varDataVal(colDataGetVarData(pColData, ri));
+ int32_t len = varDataLen(colDataGetVarData(pColData, ri));
+ temp = cJSON_malloc(len + 1);
+ QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ memcpy(temp, src, len);
+ temp[len] = '\0';
+
+ cJSON* item = cJSON_CreateStringReference(temp);
+ JSON_CHECK_ADD_ITEM(obj, colName, item);
+
+ // let the cjson object to free memory later
+ item->type &= ~cJSON_IsReference;
+ temp = NULL;
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UTINYINT: {
+ uint8_t val = *(uint8_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_USMALLINT: {
+ uint16_t val = *(uint16_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UINT: {
+ uint32_t val = *(uint32_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UBIGINT: {
+ uint64_t val = *(uint64_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ default: {
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference(""));
+ break;
+ }
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (temp) {
+ cJSON_free(temp);
+ }
+ return code;
+}
+
+int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
+ const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
+ SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SNode* node = NULL;
+ cJSON* event = NULL;
+ cJSON* fields = NULL;
+ cJSON* cond = NULL;
+ SStreamNotifyEvent item = {0};
+ char windowId[64];
+
+ QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ event = cJSON_CreateObject();
+ QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+ // add basic info
+ streamNotifyGetEventWindowId(pSessionKey, windowId);
+ if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
+ JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_OPEN"));
+ } else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
+ JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_CLOSE"));
+ }
+ JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs()));
+ JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateStringReference(windowId));
+ JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference("Event"));
+ JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey));
+ if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
+ JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey));
+ }
+
+ // create fields object to store matched column values
+ fields = cJSON_CreateObject();
+ QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ FOREACH(node, pCondCols) {
+ SColumnNode* pColDef = (SColumnNode*)node;
+ SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId);
+ code = jsonAddColumnField(pColDef->colName, pColData, ri, fields);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
+ // add trigger condition
+ cond = cJSON_CreateObject();
+ QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
+ JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
+ fields = NULL;
+ JSON_CHECK_ADD_ITEM(event, "triggerConditions", cond);
+ cond = NULL;
+
+ // convert json object to string value
+ item.gid = pSessionKey->groupId;
+ item.skey = pSessionKey->win.skey;
+ item.isEnd = (eventType == SNOTIFY_EVENT_WINDOW_CLOSE);
+ item.content = cJSON_PrintUnformatted(event);
+ QUERY_CHECK_NULL(taosArrayPush(sup->pWindowEvents, &item), code, lino, _end, terrno);
+ item.content = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ destroyStreamWindowEvent(&item);
+ if (cond != NULL) {
+ cJSON_Delete(cond);
+ }
+ if (fields != NULL) {
+ cJSON_Delete(fields);
+ }
+ if (event != NULL) {
+ cJSON_Delete(event);
+ }
+ return code;
+}
+
+int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
+ SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SNode * node = NULL;
+ cJSON* event = NULL;
+ cJSON* result = NULL;
+ SStreamNotifyEvent item = {0};
+ SColumnInfoData* pWstartCol = NULL;
+
+ QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ pWstartCol = taosArrayGet(pResultBlock->pDataBlock, 0);
+ for (int32_t i = 0; i< pResultBlock->info.rows; ++i) {
+ event = cJSON_CreateObject();
+ QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+ // convert the result row into json
+ result = cJSON_CreateObject();
+ QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) {
+ SSchema *pCol = pSchemaWrapper->pSchema + j;
+ SColumnInfoData *pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1);
+ code = jsonAddColumnField(pCol->name, pColData, i, result);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ JSON_CHECK_ADD_ITEM(event, "result", result);
+ result = NULL;
+
+ item.gid = pResultBlock->info.id.groupId;
+ item.skey = *(uint64_t*)colDataGetNumData(pWstartCol, i);
+ item.content = cJSON_PrintUnformatted(event);
+ QUERY_CHECK_NULL(taosArrayPush(sup->pWindowResults, &item), code, lino, _end, terrno);
+ item.content = NULL;
+
+ cJSON_Delete(event);
+ event = NULL;
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ destroyStreamWindowEvent(&item);
+ if (result != NULL) {
+ cJSON_Delete(result);
+ }
+ if (event != NULL) {
+ cJSON_Delete(event);
+ }
+ return code;
+}
+
+static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ const SStorageAPI* pAPI = NULL;
+ void* tbname = NULL;
+ int32_t winCode = TSDB_CODE_SUCCESS;
+ char parTbName[TSDB_TABLE_NAME_LEN];
+ const SStreamTaskInfo* pStreamInfo = NULL;
+
+ QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pTableName = NULL;
+
+ pAPI = &pTaskInfo->storageAPI;
+ code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode);
+ QUERY_CHECK_CODE(code, lino, _end);
+ if (winCode != TSDB_CODE_SUCCESS) {
+ parTbName[0] = '\0';
+ } else {
+ memcpy(parTbName, tbname, sizeof(parTbName));
+ }
+ pAPI->stateStore.streamStateFreeVal(tbname);
+
+ pStreamInfo = &pTaskInfo->streamInfo;
+ code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t streamNotifyFillTableName(const char* tableName, const SStreamNotifyEvent* pEvent,
+ const SStreamNotifyEvent* pResult, char** pVal) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ static const char* prefix = "{\"tableName\":\"";
+ uint64_t prefixLen = 0;
+ uint64_t nameLen = 0;
+ uint64_t eventLen = 0;
+ uint64_t resultLen = 0;
+ uint64_t valLen = 0;
+ char* val = NULL;
+ char* p = NULL;
+
+ QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pEvent, code, lino , _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pVal, code, lino , _end, TSDB_CODE_INVALID_PARA);
+
+ *pVal = NULL;
+ prefixLen = strlen(prefix);
+ nameLen = strlen(tableName);
+ eventLen = strlen(pEvent->content);
+
+ if (pResult != NULL) {
+ resultLen = strlen(pResult->content);
+ valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + resultLen;
+ } else {
+ valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + 1;
+ }
+ val = taosMemoryMalloc(valLen);
+ QUERY_CHECK_NULL(val, code, lino, _end, terrno);
+ varDataSetLen(val, valLen - VARSTR_HEADER_SIZE);
+
+ p = varDataVal(val);
+ TAOS_STRNCPY(p, prefix, prefixLen);
+ p += prefixLen;
+ TAOS_STRNCPY(p, tableName, nameLen);
+ p += nameLen;
+ *(p++) = '\"';
+ TAOS_STRNCPY(p, pEvent->content, eventLen);
+ *p = ',';
+
+ if (pResult != NULL) {
+ p += eventLen - 1;
+ TAOS_STRNCPY(p, pResult->content, resultLen);
+ *p = ',';
+ }
+ *pVal = val;
+ val = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (val != NULL) {
+ taosMemoryFreeClear(val);
+ }
+ return code;
+}
+
+int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SColumnInfoData* pEventStrCol = NULL;
+ int32_t nWindowEvents = 0;
+ int32_t nWindowResults = 0;
+ SHashObj* pTableNameHashMap = NULL; // map groupid to the dest table name
+ SHashObj* pResultHashMap = NULL; // map groupid+skey to the window result
+ char* val = NULL;
+
+ if (pTaskInfo == NULL || sup == NULL) {
+ goto _end;
+ }
+
+ QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ blockDataCleanup(sup->pEventBlock);
+ nWindowEvents = taosArrayGetSize(sup->pWindowEvents);
+ nWindowResults = taosArrayGetSize(sup->pWindowResults);
+ if (nWindowEvents == 0) {
+ goto _end;
+ }
+
+ // build hash map from groupid to the dest table name
+ pTableNameHashMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
+ QUERY_CHECK_NULL(pTableNameHashMap, code, lino, _end, terrno);
+ for (int32_t i = 0; i < nWindowEvents; ++i) {
+ SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowEvents, i);
+ char* tableName = taosHashGet(pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
+ if (tableName == NULL) {
+ code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+ code = taosHashPut(pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1);
+ taosMemoryFreeClear(tableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ }
+
+ // build hash map from groupid+skey to the window result
+ pResultHashMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ QUERY_CHECK_NULL(pResultHashMap, code, lino, _end, terrno);
+ for (int32_t i = 0; i < nWindowResults; ++i) {
+ SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowResults, i);
+ code =
+ taosHashPut(pResultHashMap, &pEvent->gid, sizeof(pEvent->gid) + sizeof(pEvent->skey), &pEvent, sizeof(&pEvent));
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
+ code = blockDataEnsureCapacity(sup->pEventBlock, nWindowEvents);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno);
+
+ for (int32_t i = 0; i < nWindowEvents; ++i) {
+ SStreamNotifyEvent** ppResult = NULL;
+ SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowEvents, i);
+ char* tableName = taosHashGet(pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
+ QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
+ if (pEvent->isEnd) {
+ ppResult = taosHashGet(pResultHashMap, &pEvent->gid, sizeof(pEvent->gid) + sizeof(pEvent->skey));
+ QUERY_CHECK_NULL(ppResult, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
+ QUERY_CHECK_NULL(*ppResult, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
+ }
+ code = streamNotifyFillTableName(tableName, pEvent, ppResult ? *ppResult : NULL, &val);
+ QUERY_CHECK_CODE(code, lino, _end);
+ code = colDataSetVal(pEventStrCol, i, val, false);
+ QUERY_CHECK_CODE(code, lino, _end);
+ taosMemoryFreeClear(val);
+ sup->pEventBlock->info.rows++;
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (val != NULL) {
+ taosMemoryFreeClear(val);
+ }
+ if (pTableNameHashMap) {
+ taosHashCleanup(pTableNameHashMap);
+ }
+ if (pResultHashMap) {
+ taosHashCleanup(pResultHashMap);
+ }
+ if (sup != NULL) {
+ taosArrayClearEx(sup->pWindowEvents, destroyStreamWindowEvent);
+ taosArrayClearEx(sup->pWindowResults, destroyStreamWindowEvent);
+ }
+ return code;
}
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index 0e98a899688c..c875cbad0581 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -3657,22 +3657,20 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions
return pOptions;
}
-static bool validateHttpUrl(const char* url) {
- const char *httpPrefix = "http://";
- const char *httpsPrefix = "https://";
+static bool validateNotifyUrl(const char* url) {
+ const char* prefix[] = {"http://", "https://", "ws://", "wss://"};
const char* host = NULL;
if (!url || *url == '\0') return false;
- if (strncasecmp(url, httpPrefix, strlen(httpPrefix)) == 0) {
- host = url + strlen(httpPrefix);
- } else if (strncasecmp(url, httpsPrefix, strlen(httpsPrefix)) == 0) {
- host = url + strlen(httpsPrefix);
- } else {
- return false;
+ for (int32_t i = 0; i < ARRAY_SIZE(prefix); ++i) {
+ if (strncasecmp(url, prefix[i], strlen(prefix[i])) == 0) {
+ host = url + strlen(prefix[i]);
+ break;
+ }
}
- return (*host != '\0') && (*host != '/');
+ return (host != NULL) && (*host != '\0') && (*host != '/');
}
SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes) {
@@ -3697,7 +3695,7 @@ SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls,
"notification address \"%s\" exceed maximum length %d", url, TSDB_STREAM_NOTIFY_URL_LEN);
goto _err;
}
- if (!validateHttpUrl(url)) {
+ if (!validateNotifyUrl(url)) {
pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"invalid notification address \"%s\"", url);
goto _err;
From e4842910360f3ea01695658b9c99cf5e8735fc1f Mon Sep 17 00:00:00 2001
From: Jinqing Kuang
Date: Fri, 10 Jan 2025 08:50:26 +0800
Subject: [PATCH 4/4] feat(stream)[TS-5469]. make sink task send notify event
---
include/libs/stream/tstream.h | 1 +
source/dnode/mnode/impl/src/mndStream.c | 2 +
source/dnode/vnode/CMakeLists.txt | 1 +
source/dnode/vnode/src/inc/tq.h | 5 +
source/dnode/vnode/src/inc/vnodeInt.h | 5 +
source/dnode/vnode/src/tq/tqSink.c | 6 +
source/dnode/vnode/src/tq/tqStreamNotify.c | 355 +++++++++++++++++++
source/dnode/vnode/src/vnd/vnodeOpen.c | 10 +
source/libs/executor/src/streamexecutorInt.c | 14 +-
source/libs/stream/src/streamTask.c | 14 +-
10 files changed, 404 insertions(+), 9 deletions(-)
create mode 100644 source/dnode/vnode/src/tq/tqStreamNotify.c
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 8d44e7487b7b..1a5048ed3342 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -435,6 +435,7 @@ typedef struct SNotifyInfo {
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
+ char* streamName;
} SNotifyInfo;
struct SStreamTask {
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index b95d5e315a6f..c5f79de6c98f 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -766,6 +766,8 @@ static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, SStr
TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
+ pTask->notifyInfo.streamName = taosStrdup(createReq->name);
+ TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
_end:
if (code != TSDB_CODE_SUCCESS) {
diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt
index 8f63cc87798c..b90e1844ae70 100644
--- a/source/dnode/vnode/CMakeLists.txt
+++ b/source/dnode/vnode/CMakeLists.txt
@@ -75,6 +75,7 @@ set(
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
+ "src/tq/tqStreamNotify.c"
)
aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES)
diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index 28a0d1175761..9f294f2d8503 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -160,6 +160,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
+// tq send notifications
+int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
+void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
+int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode);
+
#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index 940116317c59..02c3b3ebe092 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
+typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap;
+
#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"
@@ -496,6 +498,9 @@ struct SVnode {
int64_t blockSeq;
SQHandle* pQuery;
SVMonitorObj monitor;
+
+ // Notification Handles
+ SStreamNotifyHandleMap* pNotifyHandleMap;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 7c085ea31a43..62dd044262e2 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -1150,6 +1150,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return;
}
+ code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
+ // continue processing even if notification fails
+ }
+
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c
new file mode 100644
index 000000000000..2cdbc64e35e5
--- /dev/null
+++ b/source/dnode/vnode/src/tq/tqStreamNotify.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "cmdnodes.h"
+#include "tq.h"
+
+typedef struct SStreamNotifyHandle {
+ TdThreadMutex mutex;
+ char* url;
+} SStreamNotifyHandle;
+
+struct SStreamNotifyHandleMap {
+ TdThreadMutex gMutex;
+ SHashObj *handleMap;
+};
+
+static void destroyStreamNotifyHandle(void *ptr) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SStreamNotifyHandle** ppHandle = ptr;
+
+ if (ppHandle == NULL || *ppHandle == NULL) {
+ return;
+ }
+ code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
+ taosMemoryFreeClear((*ppHandle)->url);
+}
+
+static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
+ if (ppHandle == NULL || *ppHandle == NULL) {
+ return;
+ }
+ (void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
+ *ppHandle = NULL;
+}
+
+static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url, SStreamNotifyHandle** ppHandle) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ bool gLocked = false;
+ SStreamNotifyHandle** ppFindHandle = NULL;
+ SStreamNotifyHandle* pNewHandle = NULL;
+
+ TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *ppHandle = NULL;
+
+ code = taosThreadMutexLock(&pMap->gMutex);
+ TSDB_CHECK_CODE(code, lino, _end);
+ gLocked = true;
+
+ ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
+ if (ppFindHandle == NULL) {
+ pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
+ TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
+ code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
+ TSDB_CHECK_CODE(code, lino, _end);
+ code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
+ TSDB_CHECK_CODE(code, lino, _end);
+ *ppHandle = pNewHandle;
+ pNewHandle = NULL;
+ } else {
+ *ppHandle = *ppFindHandle;
+ }
+
+ code = taosThreadMutexLock(&(*ppHandle)->mutex);
+ TSDB_CHECK_CODE(code, lino, _end);
+
+ (void)taosThreadMutexUnlock(&pMap->gMutex);
+ gLocked = false;
+
+ if ((*ppHandle)->url == NULL) {
+ (*ppHandle)->url = taosStrdup(url);
+ TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ if (*ppHandle) {
+ releaseStreamNotifyHandle(ppHandle);
+ }
+ }
+ if (pNewHandle) {
+ destroyStreamNotifyHandle(&pNewHandle);
+ }
+ if (gLocked) {
+ (void)taosThreadMutexUnlock(&pMap->gMutex);
+ }
+ return code;
+}
+
+int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SStreamNotifyHandleMap* pMap = NULL;
+
+ TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *ppMap = NULL;
+ pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
+ TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
+ code = taosThreadMutexInit(&pMap->gMutex, NULL);
+ TSDB_CHECK_CODE(code, lino, _end);
+ pMap->handleMap = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
+ taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
+ *ppMap = pMap;
+ pMap = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (pMap != NULL) {
+ tqDestroyNotifyHandleMap(&pMap);
+ }
+ return code;
+}
+
+void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ if (*ppMap == NULL) {
+ return;
+ }
+ taosHashCleanup((*ppMap)->handleMap);
+ code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
+ taosMemoryFreeClear((*ppMap));
+}
+
+#define JSON_CHECK_ADD_ITEM(obj, str, item) \
+ TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+
+static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ cJSON* obj = NULL;
+ cJSON* streams = NULL;
+ cJSON* stream = NULL;
+ char msgId[37];
+
+ TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pHeader = NULL;
+
+ code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
+ TSDB_CHECK_CODE(code, lino, _end);
+
+ stream = cJSON_CreateObject();
+ TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
+ JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
+
+ streams = cJSON_CreateArray();
+ TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+ stream = NULL;
+
+ obj = cJSON_CreateObject();
+ TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
+ JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
+ JSON_CHECK_ADD_ITEM(obj, "streams", streams);
+ streams = NULL;
+
+ *pHeader = cJSON_PrintUnformatted(obj);
+ TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (stream != NULL) {
+ cJSON_Delete(stream);
+ }
+ if (streams != NULL) {
+ cJSON_Delete(streams);
+ }
+ if (obj != NULL) {
+ cJSON_Delete(obj);
+ }
+ return code;
+}
+
+static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ int32_t numOfBlocks = 0;
+ int32_t msgHeaderLen = 0;
+ int32_t msgTailLen = 0;
+ int32_t msgLen = 0;
+ char* msgHeader = NULL;
+ const char* msgTail = "]}]}";
+ char* msg = NULL;
+
+ TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pMsg = NULL;
+ numOfBlocks = taosArrayGetSize(pBlocks);
+
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
+ if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
+ SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
+ char* val = colDataGetVarData(pEventStrCol, j);
+ msgLen += varDataLen(val) + 1;
+ }
+ }
+
+ if (msgLen == 0) {
+ // skip since no notification events found
+ goto _end;
+ }
+
+ code = getStreamNotifyEventHeader(streamName, &msgHeader);
+ TSDB_CHECK_CODE(code, lino, _end);
+ msgHeaderLen = strlen(msgHeader);
+ msgTailLen = strlen(msgTail);
+ msgLen += msgHeaderLen;
+
+ msg = taosMemoryMalloc(msgLen);
+ TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
+ char* p = msg;
+ TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
+ p += msgHeaderLen - msgTailLen;
+
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
+ if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
+ SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
+ char* val = colDataGetVarData(pEventStrCol, j);
+ TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
+ p += varDataLen(val);
+ *(p++) = ',';
+ }
+ }
+
+ p -= 1;
+ TAOS_STRNCPY(p, msgTail, msgTailLen);
+ *(p + msgTailLen) = '\0';
+
+ *pMsg = msg;
+ msg = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (msgHeader != NULL) {
+ cJSON_free(msgHeader);
+ }
+ if (msg != NULL) {
+ taosMemoryFreeClear(msg);
+ }
+ return code;
+}
+
+static int32_t sendSingleStreamNotify(SStreamNotifyHandle *pHandle, const char *msg) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ tqWarn("%s <= %s", pHandle->url, msg);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ char* msg = NULL;
+ int32_t nNotifyAddr = 0;
+ SStreamNotifyHandle* pHandle = NULL;
+
+ TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
+ if (nNotifyAddr == 0) {
+ goto _end;
+ }
+
+ code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg);
+ TSDB_CHECK_CODE(code, lino, _end);
+ if (msg == NULL) {
+ goto _end;
+ }
+
+ for (int32_t i = 0; i < nNotifyAddr; ++i) {
+ const char * url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
+ code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("failed to get stream notify handle of %s", url);
+ if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
+ // retry for event message sending in PAUSE error handling mode
+ --i;
+ continue;
+ } else {
+ // simply ignore the failure in DROP error handling mode
+ code = TSDB_CODE_SUCCESS;
+ continue;
+ }
+ }
+ code = sendSingleStreamNotify(pHandle, msg);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
+ if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
+ // retry for event message sending in PAUSE error handling mode
+ --i;
+ } else {
+ // simply ignore the failure in DROP error handling mode
+ code = TSDB_CODE_SUCCESS;
+ }
+ }
+ releaseStreamNotifyHandle(&pHandle);
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (msg) {
+ taosMemoryFreeClear(msg);
+ }
+ return code;
+}
diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c
index 6de5298728ca..280ee527f751 100644
--- a/source/dnode/vnode/src/vnd/vnodeOpen.c
+++ b/source/dnode/vnode/src/vnd/vnodeOpen.c
@@ -15,6 +15,7 @@
#include "sync.h"
#include "tcs.h"
+#include "tq.h"
#include "tsdb.h"
#include "vnd.h"
@@ -483,6 +484,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
ret = taosRealPath(tdir, NULL, sizeof(tdir));
TAOS_UNUSED(ret);
+ // init handle map for stream event notification
+ ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap);
+ if (ret != TSDB_CODE_SUCCESS) {
+ vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode));
+ terrno = ret;
+ goto _err;
+ }
+
// open query
vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
if (vnodeQueryOpen(pVnode)) {
@@ -555,6 +564,7 @@ void vnodeClose(SVnode *pVnode) {
vnodeAWait(&pVnode->commitTask);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
+ tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c
index 85cbdb08014b..f50e10b357eb 100644
--- a/source/libs/executor/src/streamexecutorInt.c
+++ b/source/libs/executor/src/streamexecutorInt.c
@@ -45,7 +45,7 @@ static void destroyStreamWindowEvent(void* ptr) {
cJSON_free(pEvent->content);
}
-static void destroyStreamWindowEventSupp(SStreamNotifyEventSupp* sup) {
+static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) {
if (sup == NULL) return;
taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent);
taosArrayDestroyEx(sup->pWindowResults, destroyStreamWindowEvent);
@@ -53,7 +53,7 @@ static void destroyStreamWindowEventSupp(SStreamNotifyEventSupp* sup) {
*sup = (SStreamNotifyEventSupp){0};
}
-static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) {
+static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp *sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pBlock = NULL;
@@ -85,7 +85,7 @@ static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) {
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (sup) {
- destroyStreamWindowEventSupp(sup);
+ destroyStreamNotifyEventSupp(sup);
}
}
if (pBlock != NULL) {
@@ -97,11 +97,11 @@ static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) {
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
- return initStreamWindowEventSupp(&pBasicInfo->windowEventSup);
+ return initStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
}
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
- destroyStreamWindowEventSupp(&pBasicInfo->windowEventSup);
+ destroyStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
}
// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance.
@@ -123,7 +123,7 @@ static char* u64toaFastLut(uint64_t val, char* buf) {
strncpy(p, lut + val * 2, 2);
p += 2;
} else if (val > 0 || p == temp) {
- *p++ = val + '0';
+ *(p++) = val + '0';
}
while (p != temp) {
@@ -141,7 +141,7 @@ static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *b
// The windowId is generated by computing a hash value using the concatenation of group ID and window start time, in
// the format: "_".
p = u64toaFastLut(pSessionKey->groupId, p);
- *p++ = '_';
+ *(p++) = '_';
p = u64toaFastLut(pSessionKey->win.skey, p);
hash = MurmurHash3_64(buf, p - buf);
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 4c493dfdffa4..bde1fd45fa53 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -323,6 +323,7 @@ void tFreeStreamTask(void* pParam) {
pTask->chkInfo.pActiveInfo = NULL;
taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
+ taosMemoryFreeClear(pTask->notifyInfo.streamName);
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
@@ -1323,6 +1324,7 @@ static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* in
}
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
+ TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
_exit:
if (code != TSDB_CODE_SUCCESS) {
@@ -1354,6 +1356,10 @@ static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
}
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
+ char *streamName = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &streamName));
+ info->streamName = taosStrndup(streamName, TSDB_STREAM_FNAME_LEN + 1);
+ QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
_exit:
if (code != TSDB_CODE_SUCCESS) {
@@ -1431,7 +1437,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
- TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
+ if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
+ TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
+ }
tEndEncode(pEncoder);
_exit:
@@ -1531,7 +1539,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
- TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
+ if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
+ TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
+ }
tEndDecode(pDecoder);