diff --git a/cmake/addr2line_CMakeLists.txt.in b/cmake/addr2line_CMakeLists.txt.in index 93fb9bb96c8f..7cfcb4671888 100644 --- a/cmake/addr2line_CMakeLists.txt.in +++ b/cmake/addr2line_CMakeLists.txt.in @@ -2,7 +2,7 @@ # addr2line ExternalProject_Add(addr2line GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git - GIT_TAG master + GIT_TAG main SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line" BINARY_DIR "${TD_CONTRIB_DIR}/addr2line" CONFIGURE_COMMAND "" diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in index 6494177fafa2..2a14018810c7 100644 --- a/cmake/curl_CMakeLists.txt.in +++ b/cmake/curl_CMakeLists.txt.in @@ -12,7 +12,7 @@ ExternalProject_Add(curl2 BUILD_IN_SOURCE TRUE BUILD_ALWAYS 1 UPDATE_COMMAND "" - CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug + CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-websockets --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug BUILD_COMMAND make -j INSTALL_COMMAND make install TEST_COMMAND "" diff --git a/cmake/ssl_CMakeLists.txt.in b/cmake/ssl_CMakeLists.txt.in index 1098593943ff..81e1cb15e937 100644 --- a/cmake/ssl_CMakeLists.txt.in +++ b/cmake/ssl_CMakeLists.txt.in @@ -6,9 +6,9 @@ ExternalProject_Add(openssl DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" SOURCE_DIR "${TD_CONTRIB_DIR}/openssl" BUILD_IN_SOURCE TRUE - #BUILD_ALWAYS 1 - #UPDATE_COMMAND "" - CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared + BUILD_ALWAYS 1 + UPDATE_COMMAND "" + CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared BUILD_COMMAND make -j INSTALL_COMMAND make install_sw -j TEST_COMMAND "" diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 2304ad54aa95..767df03d223e 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS}) file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/) cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) endif(${BUILD_WITH_COS}) configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") @@ -146,11 +145,16 @@ if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_SQLITE}) +# libcurl +if(NOT ${TD_WINDOWS}) + file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/) + cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(NOT ${TD_WINDOWS}) + # s3 if(${BUILD_WITH_S3}) - cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_S3) @@ -160,7 +164,6 @@ elseif(${BUILD_WITH_COS}) # cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) # cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) # cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - # cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_COS) endif() @@ -199,6 +202,11 @@ endif() # lemon cat("${TD_SUPPORT_DIR}/lemon_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +# Force specify CC=cc on MacOS. Because the default CC setting in the generated Makefile has issues finding standard library headers +IF(${TD_DARWIN}) + SET(CONTRIB_CONFIG_ENV "CC=cc") +ENDIF() + # download dependencies configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 04507665353e..c30f2ab4ecd3 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -160,6 +160,7 @@ typedef enum EStreamType { STREAM_PARTITION_DELETE_DATA, STREAM_GET_RESULT, STREAM_DROP_CHILD_TABLE, + STREAM_NOTIFY_EVENT, } EStreamType; #pragma pack(push, 1) @@ -408,6 +409,9 @@ typedef struct STUidTagInfo { #define UD_GROUPID_COLUMN_INDEX 1 #define UD_TAG_COLUMN_INDEX 2 +// stream notify event block column +#define NOTIFY_EVENT_STR_COLUMN_INDEX 0 + int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime); int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol); diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 1103b89ccb04..96478047ca59 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName); int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap); int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); +int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule, + char** dstTableName); int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index aebe09b56344..82eaa2359edf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -269,6 +269,7 @@ typedef enum ENodeType { QUERY_NODE_TSMA_OPTIONS, QUERY_NODE_ANOMALY_WINDOW, QUERY_NODE_RANGE_AROUND, + QUERY_NODE_STREAM_NOTIFY_OPTIONS, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, @@ -2956,6 +2957,11 @@ typedef struct { // 3.3.0.0 SArray* pCols; // array of SField int64_t smaId; + // 3.3.6.0 + SArray* pNotifyAddrUrls; + int32_t notifyEventTypes; + int32_t notifyErrorHandle; + int8_t notifyHistory; } SCMCreateStreamReq; typedef struct { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 883c5f7b99cc..9a7c3912b0d5 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -98,6 +98,9 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); +int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper, + const char* stbFullName, bool newSubTableRule); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 12d77bd0c24b..26482a87d4c7 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -566,19 +566,44 @@ typedef struct SStreamOptions { int64_t setFlag; } SStreamOptions; +typedef enum EStreamNotifyOptionSetFlag { + SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0), + SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1), +} EStreamNotifyOptionSetFlag; + +typedef enum EStreamNotifyEventType { + SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0), + SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1), +} EStreamNotifyEventType; + +typedef enum EStreamNotifyErrorHandleType { + SNOTIFY_ERROR_HANDLE_PAUSE, + SNOTIFY_ERROR_HANDLE_DROP, +} EStreamNotifyErrorHandleType; + +typedef struct SStreamNotifyOptions { + ENodeType type; + SNodeList* pAddrUrls; + EStreamNotifyEventType eventTypes; + EStreamNotifyErrorHandleType errorHandle; + bool notifyHistory; + EStreamNotifyOptionSetFlag setFlag; +} SStreamNotifyOptions; + typedef struct SCreateStreamStmt { - ENodeType type; - char streamName[TSDB_TABLE_NAME_LEN]; - char targetDbName[TSDB_DB_NAME_LEN]; - char targetTabName[TSDB_TABLE_NAME_LEN]; - bool ignoreExists; - SStreamOptions* pOptions; - SNode* pQuery; - SNode* pPrevQuery; - SNodeList* pTags; - SNode* pSubtable; - SNodeList* pCols; - SCMCreateStreamReq* pReq; + ENodeType type; + char streamName[TSDB_TABLE_NAME_LEN]; + char targetDbName[TSDB_DB_NAME_LEN]; + char targetTabName[TSDB_TABLE_NAME_LEN]; + bool ignoreExists; + SStreamOptions* pOptions; + SNode* pQuery; + SNode* pPrevQuery; + SNodeList* pTags; + SNode* pSubtable; + SNodeList* pCols; + SStreamNotifyOptions* pNotifyOptions; + SCMCreateStreamReq* pReq; } SCreateStreamStmt; typedef struct SDropStreamStmt { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a4d89dcdcc84..9cd6dd13ca4a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -65,10 +65,14 @@ typedef struct SStreamTaskSM SStreamTaskSM; typedef struct SStreamQueueItem SStreamQueueItem; typedef struct SActiveCheckpointInfo SActiveCheckpointInfo; -#define SSTREAM_TASK_VER 4 -#define SSTREAM_TASK_INCOMPATIBLE_VER 1 -#define SSTREAM_TASK_NEED_CONVERT_VER 2 -#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 +#define SSTREAM_TASK_VER 5 +#define SSTREAM_TASK_INCOMPATIBLE_VER 1 +#define SSTREAM_TASK_NEED_CONVERT_VER 2 +#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId +#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId +#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close + +#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) extern int32_t streamMetaRefPool; extern int32_t streamTaskRefPool; @@ -427,6 +431,15 @@ typedef struct STaskCheckInfo { TdThreadMutex checkInfoLock; } STaskCheckInfo; +typedef struct SNotifyInfo { + SArray* pNotifyAddrUrls; + int32_t notifyEventTypes; + int32_t notifyErrorHandle; + char* streamName; + char* stbFullName; + SSchemaWrapper* pSchemaWrapper; +} SNotifyInfo; + struct SStreamTask { int64_t ver; SStreamTaskId id; @@ -449,6 +462,7 @@ struct SStreamTask { SStreamState* pState; // state backend SUpstreamInfo upstreamInfo; STaskCheckInfo taskCheckInfo; + SNotifyInfo notifyInfo; // the followings attributes don't be serialized SScanhistorySchedInfo schedHistoryInfo; diff --git a/include/util/tdef.h b/include/util/tdef.h index 0fa00bf1d2c2..f08697b0d424 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -245,6 +245,7 @@ typedef enum ELogicConditionType { #define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string +#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0' #define TSDB_DB_NAME_LEN 65 #define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024 diff --git a/include/util/tlog.h b/include/util/tlog.h index f573d61e7359..60ddc292887e 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -79,6 +79,9 @@ void taosResetLog(); void taosDumpData(uint8_t *msg, int32_t len); void taosSetNoNewFile(); +// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance. +char *u64toaFastLut(uint64_t val, char *buf); + void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...) #ifdef __GNUC__ __attribute__((format(printf, 4, 5))) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index e050eaa16dfa..8dccdaa0167d 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -54,6 +54,23 @@ target_link_libraries( INTERFACE api ) +if(NOT ${TD_WINDOWS}) + target_include_directories( + common + PUBLIC "$ENV{HOME}/.cos-local.2/include" + ) + + find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + target_link_libraries( + common + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + ) +endif() + if(${BUILD_S3}) if(${BUILD_WITH_S3}) target_include_directories( @@ -65,9 +82,6 @@ if(${BUILD_S3}) set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) find_library(S3_LIBRARY s3) - find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common @@ -87,7 +101,6 @@ if(${BUILD_S3}) find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/) find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/) find_library(MINIXML_LIBRARY mxml) - find_library(CURL_LIBRARY curl) target_link_libraries( common diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index a3989012f642..7a51669d463b 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS } TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId)); + + int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize)); + for (int32_t i = 0; i < addrSize; ++i) { + const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i); + TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url))); + } + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory)); tEndEncode(&encoder); _exit: @@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId)); } + if (!tDecodeIsEnd(&decoder)) { + int32_t addrSize = 0; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize)); + pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES); + if (pReq->pNotifyAddrUrls == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < addrSize; ++i) { + char *url = NULL; + TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url)); + url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN); + if (url == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) { + taosMemoryFree(url); + TAOS_CHECK_EXIT(terrno); + } + } + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory)); + } + tEndDecode(&decoder); _exit: tDecoderClear(&decoder); @@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosArrayDestroy(pReq->fillNullCols); taosArrayDestroy(pReq->pVgroupVerList); taosArrayDestroy(pReq->pCols); + taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL); } int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bd18c9ceb94b..c3e0fff57849 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha return code; } +int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule, + char** dstTableName) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + if (parTbName[0]) { + if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 && + stbFullName) { + *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno); + + tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN); + code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN); + TSDB_CHECK_CODE(code, lino, _end); + } else { + *dstTableName = taosStrdup(parTbName); + TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno); + } + } else { + code = buildCtbNameByGroupId(stbFullName, gid, dstTableName); + TSDB_CHECK_CODE(code, lino, _end); + } + +_end: + return code; +} + // return length of encoded data, return -1 if failed int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) { int32_t code = blockDataCheck(pBlock); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 50018e867f58..c1cf41103b76 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -753,6 +753,77 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) { return TSDB_CODE_SUCCESS; } +static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); } + +static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream, + SStreamTask *pTask) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA); + + pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup); + TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno); + pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes; + pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle; + pTask->notifyInfo.streamName = taosStrdup(createReq->name); + TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno); + pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName); + TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno); + pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno); + +_end: + if (code != TSDB_CODE_SUCCESS) { + mError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t level = 0; + int32_t nTasks = 0; + SArray *pLevel = NULL; + + TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA); + + if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) { + goto _end; + } + + level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; ++i) { + pLevel = taosArrayGetP(pStream->tasks, i); + nTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < nTasks; ++j) { + code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j)); + TSDB_CHECK_CODE(code, lino, _end); + } + } + + if (pStream->conf.fillHistory && createReq->notifyHistory) { + level = taosArrayGetSize(pStream->pHTasksList); + for (int32_t i = 0; i < level; ++i) { + pLevel = taosArrayGetP(pStream->pHTasksList, i); + nTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < nTasks; ++j) { + code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j)); + TSDB_CHECK_CODE(code, lino, _end); + } + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code)); + } + return code; +} + static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -850,6 +921,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + // add notify info into all stream tasks + code = addStreamNotifyInfo(&createReq, &streamObj); + if (code != TSDB_CODE_SUCCESS) { + mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code)); + mndTransDrop(pTrans); + goto _OVER; + } + // add stream to trans code = mndPersistStream(pTrans, &streamObj); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8f63cc87798c..b90e1844ae70 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -75,6 +75,7 @@ set( "src/tq/tqSnapshot.c" "src/tq/tqStreamStateSnap.c" "src/tq/tqStreamTaskSnap.c" + "src/tq/tqStreamNotify.c" ) aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 12a803d1d82c..e0bf51b333a2 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,6 +159,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq); int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type); +// tq send notifications +int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap); +void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap); +int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode); + #define TQ_ERR_GO_TO_END(c) \ do { \ code = c; \ diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c59..02c3b3ebe092 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo; typedef struct SCompactInfo SCompactInfo; typedef struct SQueryNode SQueryNode; +typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap; + #define VNODE_META_TMP_DIR "meta.tmp" #define VNODE_META_BACKUP_DIR "meta.backup" @@ -496,6 +498,9 @@ struct SVnode { int64_t blockSeq; SQHandle* pQuery; SVMonitorObj monitor; + + // Notification Handles + SStreamNotifyHandleMap* pNotifyHandleMap; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7ba77cf8135c..98ea92125c8d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -16,8 +16,6 @@ #include "tcommon.h" #include "tq.h" -#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) - typedef struct STableSinkInfo { uint64_t uid; tstr name; @@ -983,7 +981,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName)); - } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { + } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) { code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName)); } if (code != TSDB_CODE_SUCCESS) { @@ -1150,6 +1148,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } + code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id); + // continue processing even if notification fails + } + bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, @@ -1173,6 +1177,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) { code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) { + continue; } else { code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs); } @@ -1317,6 +1323,10 @@ void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVno continue; } + if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) { + continue; + } + hasSubmit = true; pTask->execInfo.sink.numOfBlocks += 1; uint64_t groupId = pDataBlock->info.id.groupId; diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c new file mode 100644 index 000000000000..46ee95d3b9b9 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqStreamNotify.c @@ -0,0 +1,445 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cmdnodes.h" +#include "tq.h" + +#ifndef WINDOWS +#include "curl/curl.h" +#endif + +#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50ms + +typedef struct SStreamNotifyHandle { + TdThreadMutex mutex; +#ifndef WINDOWS + CURL* curl; +#endif + char* url; +} SStreamNotifyHandle; + +struct SStreamNotifyHandleMap { + TdThreadMutex gMutex; + SHashObj* handleMap; +}; + +static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) { +#ifndef WINDOWS + if (pHandle == NULL || pHandle->curl == NULL) { + return; + } + // status code 1000 means normal closure + size_t len = 0; + uint16_t status = htons(1000); + CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE); + if (res != CURLE_OK) { + tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res); + } + // TODO: add wait mechanism for peer connection close response + curl_easy_cleanup(pHandle->curl); +#endif +} + +static void destroyStreamNotifyHandle(void* ptr) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamNotifyHandle** ppHandle = ptr; + + if (ppHandle == NULL || *ppHandle == NULL) { + return; + } + code = taosThreadMutexDestroy(&(*ppHandle)->mutex); + stopStreamNotifyConn(*ppHandle); + taosMemoryFreeClear((*ppHandle)->url); + taosMemoryFreeClear(*ppHandle); +} + +static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) { + if (ppHandle == NULL || *ppHandle == NULL) { + return; + } + (void)taosThreadMutexUnlock(&(*ppHandle)->mutex); + *ppHandle = NULL; +} + +static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url, + SStreamNotifyHandle** ppHandle) { +#ifndef WINDOWS + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + bool gLocked = false; + SStreamNotifyHandle** ppFindHandle = NULL; + SStreamNotifyHandle* pNewHandle = NULL; + CURL* newCurl = NULL; + CURLcode res = CURLE_OK; + + TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *ppHandle = NULL; + + code = taosThreadMutexLock(&pMap->gMutex); + TSDB_CHECK_CODE(code, lino, _end); + gLocked = true; + + ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url)); + if (ppFindHandle == NULL) { + pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle)); + TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno); + code = taosThreadMutexInit(&pNewHandle->mutex, NULL); + TSDB_CHECK_CODE(code, lino, _end); + code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES); + TSDB_CHECK_CODE(code, lino, _end); + *ppHandle = pNewHandle; + pNewHandle = NULL; + } else { + *ppHandle = *ppFindHandle; + } + + code = taosThreadMutexLock(&(*ppHandle)->mutex); + TSDB_CHECK_CODE(code, lino, _end); + + (void)taosThreadMutexUnlock(&pMap->gMutex); + gLocked = false; + + if ((*ppHandle)->curl == NULL) { + newCurl = curl_easy_init(); + TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_setopt(newCurl, CURLOPT_URL, url); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + res = curl_easy_perform(newCurl); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + (*ppHandle)->curl = newCurl; + newCurl = NULL; + } + + if ((*ppHandle)->url == NULL) { + (*ppHandle)->url = taosStrdup(url); + TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code)); + if (*ppHandle) { + releaseStreamNotifyHandle(ppHandle); + } + *ppHandle = NULL; + } + if (newCurl) { + curl_easy_cleanup(newCurl); + } + if (pNewHandle) { + destroyStreamNotifyHandle(&pNewHandle); + } + if (gLocked) { + (void)taosThreadMutexUnlock(&pMap->gMutex); + } + return code; +#else + tqError("stream notify events is not supported on windows"); + return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS; +#endif +} + +int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamNotifyHandleMap* pMap = NULL; + + TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *ppMap = NULL; + pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap)); + TSDB_CHECK_NULL(pMap, code, lino, _end, terrno); + code = taosThreadMutexInit(&pMap->gMutex, NULL); + TSDB_CHECK_CODE(code, lino, _end); + pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno); + taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle); + *ppMap = pMap; + pMap = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (pMap != NULL) { + tqDestroyNotifyHandleMap(&pMap); + } + return code; +} + +void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + if (*ppMap == NULL) { + return; + } + taosHashCleanup((*ppMap)->handleMap); + code = taosThreadMutexDestroy(&(*ppMap)->gMutex); + taosMemoryFreeClear((*ppMap)); +} + +#define JSON_CHECK_ADD_ITEM(obj, str, item) \ + TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY) + +static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + cJSON* obj = NULL; + cJSON* streams = NULL; + cJSON* stream = NULL; + char msgId[37]; + + TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *pHeader = NULL; + + code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId)); + TSDB_CHECK_CODE(code, lino, _end); + + stream = cJSON_CreateObject(); + TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName)); + JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray()); + + streams = cJSON_CreateArray(); + TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY) + stream = NULL; + + obj = cJSON_CreateObject(); + TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId)); + JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs())); + JSON_CHECK_ADD_ITEM(obj, "streams", streams); + streams = NULL; + + *pHeader = cJSON_PrintUnformatted(obj); + TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (stream != NULL) { + cJSON_Delete(stream); + } + if (streams != NULL) { + cJSON_Delete(streams); + } + if (obj != NULL) { + cJSON_Delete(obj); + } + return code; +} + +static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg, + int32_t* nNotifyEvents) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t numOfBlocks = 0; + int32_t msgHeaderLen = 0; + int32_t msgTailLen = 0; + int32_t msgLen = 0; + char* msgHeader = NULL; + const char* msgTail = "]}]}"; + char* msg = NULL; + + TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *pMsg = NULL; + numOfBlocks = taosArrayGetSize(pBlocks); + *nNotifyEvents = 0; + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) { + continue; + } + + SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX); + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + char* val = colDataGetVarData(pEventStrCol, j); + msgLen += varDataLen(val) + 1; + } + *nNotifyEvents += pDataBlock->info.rows; + } + + if (msgLen == 0) { + // skip since no notification events found + goto _end; + } + + code = getStreamNotifyEventHeader(streamName, &msgHeader); + TSDB_CHECK_CODE(code, lino, _end); + msgHeaderLen = strlen(msgHeader); + msgTailLen = strlen(msgTail); + msgLen += msgHeaderLen; + + msg = taosMemoryMalloc(msgLen); + TSDB_CHECK_NULL(msg, code, lino, _end, terrno); + char* p = msg; + TAOS_STRNCPY(p, msgHeader, msgHeaderLen); + p += msgHeaderLen - msgTailLen; + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) { + continue; + } + + SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX); + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + char* val = colDataGetVarData(pEventStrCol, j); + TAOS_STRNCPY(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + *(p++) = ','; + } + } + + p -= 1; + TAOS_STRNCPY(p, msgTail, msgTailLen); + *(p + msgTailLen) = '\0'; + + *pMsg = msg; + msg = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (msgHeader != NULL) { + cJSON_free(msgHeader); + } + if (msg != NULL) { + taosMemoryFreeClear(msg); + } + return code; +} + +static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) { +#ifndef WINDOWS + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + CURLcode res = CURLE_OK; + uint64_t sentLen = 0; + uint64_t totalLen = 0; + size_t nbytes = 0; + + TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA); + + totalLen = strlen(msg); + while (sentLen < totalLen) { + res = curl_ws_send(pHandle->curl, msg + sentLen, totalLen - sentLen, &nbytes, 0, CURLWS_TEXT); + TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); + sentLen += nbytes; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code)); + stopStreamNotifyConn(pHandle); + } + return code; +#else + tqError("stream notify events is not supported on windows"); + return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS; +#endif +} + +int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + char* msg = NULL; + int32_t nNotifyAddr = 0; + int32_t nNotifyEvents = 0; + SStreamNotifyHandle* pHandle = NULL; + + TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA); + + nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls); + if (nNotifyAddr == 0) { + goto _end; + } + + code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents); + TSDB_CHECK_CODE(code, lino, _end); + if (msg == NULL) { + goto _end; + } + + tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName, + nNotifyEvents, (uint64_t)strlen(msg)); + + for (int32_t i = 0; i < nNotifyAddr; ++i) { + if (streamTaskShouldStop(pTask)) { + break; + } + const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i); + code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle); + if (code != TSDB_CODE_SUCCESS) { + tqError("failed to get stream notify handle of %s", url); + if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) { + // retry for event message sending in PAUSE error handling mode + taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS); + --i; + continue; + } else { + // simply ignore the failure in DROP error handling mode + code = TSDB_CODE_SUCCESS; + continue; + } + } + code = sendSingleStreamNotify(pHandle, msg); + if (code != TSDB_CODE_SUCCESS) { + tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code)); + if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) { + // retry for event message sending in PAUSE error handling mode + taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS); + --i; + } else { + // simply ignore the failure in DROP error handling mode + code = TSDB_CODE_SUCCESS; + } + } else { + tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents, + url); + } + releaseStreamNotifyHandle(&pHandle); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (msg) { + taosMemoryFreeClear(msg); + } + return code; +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 06b7b33cd822..1880156f6185 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -86,6 +86,14 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { if (code) { return code; } + + code = + qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes, + pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName, IS_NEW_SUBTB_RULE(pTask)); + if (code) { + tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } } streamSetupScheduleTrigger(pTask); @@ -1357,4 +1365,4 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 6de5298728ca..280ee527f751 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,6 +15,7 @@ #include "sync.h" #include "tcs.h" +#include "tq.h" #include "tsdb.h" #include "vnd.h" @@ -483,6 +484,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ret = taosRealPath(tdir, NULL, sizeof(tdir)); TAOS_UNUSED(ret); + // init handle map for stream event notification + ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap); + if (ret != TSDB_CODE_SUCCESS) { + vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode)); + terrno = ret; + goto _err; + } + // open query vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode)); if (vnodeQueryOpen(pVnode)) { @@ -555,6 +564,7 @@ void vnodeClose(SVnode *pVnode) { vnodeAWait(&pVnode->commitTask); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); + tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap); tqClose(pVnode->pTq); walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 48afa78251b7..84eba69acb77 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -449,9 +449,17 @@ typedef struct STimeWindowAggSupp { SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; +typedef struct SStreamNotifyEventSupp { + SArray* pWindowEvents; // Array of SStreamNotifyEvent, storing window events and trigger values. + SHashObj* pTableNameHashMap; // Hash map from groupid to the dest child table name. + SHashObj* pResultHashMap; // Hash map from groupid+skey to the window agg result. + SSDataBlock* pEventBlock; // The datablock contains all window events and results. +} SStreamNotifyEventSupp; + typedef struct SSteamOpBasicInfo { - int32_t primaryPkIndex; - bool updateOperatorInfo; + int32_t primaryPkIndex; + bool updateOperatorInfo; + SStreamNotifyEventSupp windowEventSup; } SSteamOpBasicInfo; typedef struct SStreamFillSupporter { @@ -767,6 +775,8 @@ typedef struct SStreamEventAggOperatorInfo { SSHashObj* pPkDeleted; bool destHasPrimaryKey; struct SOperatorInfo* pOperator; + SNodeList* pStartCondCols; + SNodeList* pEndCondCols; } SStreamEventAggOperatorInfo; typedef struct SStreamCountAggOperatorInfo { diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index f726e4300fb4..86ee6f41245c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -71,6 +71,10 @@ typedef struct { SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; SStreamState* pState; + int32_t eventTypes; // event types to notify + SSchemaWrapper* notifyResultSchema; // agg result to notify + char* stbFullName; // used to generate dest child table name + bool newSubTableRule; // used to generate dest child table name } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 0a6908031412..7b3c828351b6 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -19,7 +19,10 @@ extern "C" { #endif +#include "cJSON.h" +#include "cmdnodes.h" #include "executorInt.h" +#include "querytask.h" #include "tutil.h" #define FILL_POS_INVALID 0 @@ -57,7 +60,8 @@ typedef struct SSlicePoint { void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); -void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); +int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); +void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); @@ -106,6 +110,13 @@ int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); TSKEY compareTs(void* pKey); +int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey, + const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri, + SStreamNotifyEventSupp* sup); +int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper, + SStreamNotifyEventSupp* sup); +int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1386b0b82f6a..39bef9c95f92 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -250,6 +250,28 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { return code; } +int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper, + const char* stbFullName, bool newSubTableRule) { + int32_t code = TSDB_CODE_SUCCESS; + SStreamTaskInfo *pStreamInfo = NULL; + + if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) { + goto _end; + } + + pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo; + pStreamInfo->eventTypes = eventTypes; + pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper); + if (pStreamInfo->notifyResultSchema == NULL) { + code = terrno; + } + pStreamInfo->stbFullName = taosStrdup(stbFullName); + pStreamInfo->newSubTableRule = newSubTableRule; + +_end: + return code; +} + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index c6a1900b416a..20c80df4fa4a 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -262,6 +262,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); tOffsetDestroy(&pStreamInfo->currentOffset); + tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema); + taosMemoryFree(pStreamInfo->stbFullName); } static void freeBlock(void* pParam) { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index fa6008eba7f6..5f4d6b30fa89 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#include "cmdnodes.h" #include "executorInt.h" #include "filter.h" #include "function.h" @@ -53,6 +55,8 @@ void destroyStreamEventOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } + + destroyStreamBasicInfo(&pInfo->basic); destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); @@ -89,6 +93,16 @@ void destroyStreamEventOperatorInfo(void* param) { pInfo->pEndCondInfo = NULL; } + if (pInfo->pStartCondCols != NULL) { + nodesDestroyList(pInfo->pStartCondCols); + pInfo->pStartCondCols = NULL; + } + + if (pInfo->pEndCondCols != NULL) { + nodesDestroyList(pInfo->pEndCondCols); + pInfo->pEndCondCols = NULL; + } + taosMemoryFreeClear(param); } @@ -121,7 +135,7 @@ void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { } int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, - int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { + int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; @@ -143,6 +157,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin); if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) { pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin); + (*pWinCode) = TSDB_CODE_SUCCESS; goto _end; } } @@ -156,6 +171,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) { setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin); pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin); + (*pWinCode) = TSDB_CODE_SUCCESS; goto _end; } } @@ -163,6 +179,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId}; code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len); QUERY_CHECK_CODE(code, lino, _error); + (*pWinCode) = TSDB_CODE_FAILED; setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin); pCurWin->pWinFlag->startFlag = start; @@ -373,10 +390,18 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl bool allEqual = true; SEventWindowInfo curWin = {0}; SSessionKey nextWinKey = {0}; + int32_t winCode = TSDB_CODE_SUCCESS; code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, - &nextWinKey); + &nextWinKey, &winCode); QUERY_CHECK_CODE(code, lino, _end); + if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && + *(bool*)colDataGetNumData(pColStart, i) && winCode != TSDB_CODE_SUCCESS) { + code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, pSDataBlock, + pInfo->pStartCondCols, i, &pInfo->basic.windowEventSup); + QUERY_CHECK_CODE(code, lino, _end); + } + setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); bool rebuild = false; code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, @@ -443,6 +468,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } + + if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) { + code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_CLOSE, &curWin.winInfo.sessionWin, pSDataBlock, + pInfo->pEndCondCols, i + winRows - 1, &pInfo->basic.windowEventSup); + QUERY_CHECK_CODE(code, lino, _end); + } } _end: @@ -563,6 +594,7 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamEventAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -577,10 +609,27 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) { + code = addAggResultNotifyEvent(pBInfo->pRes, pTaskInfo->streamInfo.notifyResultSchema, &pInfo->basic.windowEventSup); + QUERY_CHECK_CODE(code, lino, _end); + } (*ppRes) = pBInfo->pRes; return code; } + + code = buildNotifyEventBlock(pTaskInfo, &pInfo->basic.windowEventSup); + QUERY_CHECK_CODE(code, lino, _end); + if (pInfo->basic.windowEventSup.pEventBlock->info.rows > 0) { + printDataBlock(pInfo->basic.windowEventSup.pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->basic.windowEventSup.pEventBlock; + return code; + } + +_end: (*ppRes) = NULL; + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } return code; } @@ -957,6 +1006,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey; + initStreamBasicInfo(&pInfo->basic); pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED, @@ -989,6 +1039,12 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0); QUERY_CHECK_CODE(code, lino, _error); + code = + nodesCollectColumnsFromNode((SNode*)pEventNode->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pStartCondCols); + QUERY_CHECK_CODE(code, lino, _error); + code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols); + QUERY_CHECK_CODE(code, lino, _error); + *pOptrInfo = pOperator; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c index b94798934c52..9cafdfff0c75 100644 --- a/source/libs/executor/src/streamexecutorInt.c +++ b/source/libs/executor/src/streamexecutorInt.c @@ -13,7 +13,19 @@ * along with this program. If not, see . */ +#include "streamexecutorInt.h" + #include "executorInt.h" +#include "tdatablock.h" + +#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16 + +typedef struct SStreamNotifyEvent { + uint64_t gid; + TSKEY skey; + char* content; + bool isEnd; +} SStreamNotifyEvent; void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) { if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) { @@ -29,7 +41,509 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->updateOperatorInfo = false; } -void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { +static void destroyStreamWindowEvent(void* ptr) { + SStreamNotifyEvent* pEvent = ptr; + if (pEvent == NULL || pEvent->content == NULL) return; + cJSON_free(pEvent->content); +} + +static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) { + if (sup == NULL) return; + taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent); + taosHashCleanup(sup->pTableNameHashMap); + taosHashCleanup(sup->pResultHashMap); + blockDataDestroy(sup->pEventBlock); + *sup = (SStreamNotifyEventSupp){0}; +} + +static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp *sup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSDataBlock* pBlock = NULL; + SColumnInfoData infoData = {0}; + + if (sup == NULL) { + goto _end; + } + + code = createDataBlock(&pBlock); + QUERY_CHECK_CODE(code, lino, _end); + + pBlock->info.type = STREAM_NOTIFY_EVENT; + pBlock->info.watermark = INT64_MIN; + + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = tDataTypes[infoData.info.type].bytes; + code = blockDataAppendColInfo(pBlock, &infoData); + QUERY_CHECK_CODE(code, lino, _end); + + sup->pWindowEvents = taosArrayInit(0, sizeof(SStreamNotifyEvent)); + QUERY_CHECK_NULL(sup->pWindowEvents, code, lino, _end, terrno); + sup->pTableNameHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + QUERY_CHECK_NULL(sup->pTableNameHashMap, code, lino, _end, terrno); + sup->pResultHashMap = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + QUERY_CHECK_NULL(sup->pResultHashMap, code, lino, _end, terrno); + taosHashSetFreeFp(sup->pResultHashMap, destroyStreamWindowEvent); + sup->pEventBlock = pBlock; + pBlock = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + if (sup) { + destroyStreamNotifyEventSupp(sup); + } + } + if (pBlock != NULL) { + blockDataDestroy(pBlock); + } + return code; +} + +int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->primaryPkIndex = -1; pBasicInfo->updateOperatorInfo = false; + return initStreamNotifyEventSupp(&pBasicInfo->windowEventSup); +} + +void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { + destroyStreamNotifyEventSupp(&pBasicInfo->windowEventSup); +} + +static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *buf) { + uint64_t hash = 0; + uint64_t ar[2]; + + ar[0] = pSessionKey->groupId; + ar[1] = pSessionKey->win.skey; + hash = MurmurHash3_64((char*)ar, sizeof(ar)); + buf = u64toaFastLut(hash, buf); +} + +#define JSON_CHECK_ADD_ITEM(obj, str, item) \ + QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY) + +static int32_t jsonAddColumnField(const char* colName, const SColumnInfoData* pColData, int32_t ri, cJSON* obj) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + char* temp = NULL; + + QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pColData, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA); + + if (colDataIsNull_s(pColData, ri)) { + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull()); + goto _end; + } + + switch (pColData->info.type) { + case TSDB_DATA_TYPE_BOOL: { + bool val = *(bool*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val)); + break; + } + + case TSDB_DATA_TYPE_TINYINT: { + int8_t val = *(int8_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t val = *(int16_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t val = *(int32_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: { + int64_t val = *(int64_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float val = *(float*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_DOUBLE: { + double val = *(double*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_NCHAR: { + // cJSON requires null-terminated strings, but this data is not null-terminated, + // so we need to manually copy the string and add null termination. + const char* src = varDataVal(colDataGetVarData(pColData, ri)); + int32_t len = varDataLen(colDataGetVarData(pColData, ri)); + temp = cJSON_malloc(len + 1); + QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + memcpy(temp, src, len); + temp[len] = '\0'; + + cJSON* item = cJSON_CreateStringReference(temp); + JSON_CHECK_ADD_ITEM(obj, colName, item); + + // let the cjson object to free memory later + item->type &= ~cJSON_IsReference; + temp = NULL; + break; + } + + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t val = *(uint8_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t val = *(uint16_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_UINT: { + uint32_t val = *(uint32_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t val = *(uint64_t*)colDataGetNumData(pColData, ri); + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val)); + break; + } + + default: { + JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference("")); + break; + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (temp) { + cJSON_free(temp); + } + return code; +} + +int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey, + const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri, + SStreamNotifyEventSupp* sup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SNode* node = NULL; + cJSON* event = NULL; + cJSON* fields = NULL; + cJSON* cond = NULL; + SStreamNotifyEvent item = {0}; + char windowId[32]; + + QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA); + + qDebug("add stream notify event from event window, type: %s, start: %" PRId64 ", end: %" PRId64, + (eventType == SNOTIFY_EVENT_WINDOW_OPEN) ? "WINDOW_OPEN" : "WINDOW_CLOSE", pSessionKey->win.skey, + pSessionKey->win.ekey); + + event = cJSON_CreateObject(); + QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + + // add basic info + streamNotifyGetEventWindowId(pSessionKey, windowId); + if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) { + JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_OPEN")); + } else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) { + JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_CLOSE")); + } + JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs())); + JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateStringReference(windowId)); + JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference("Event")); + JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey)); + if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) { + JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey)); + } + + // create fields object to store matched column values + fields = cJSON_CreateObject(); + QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + FOREACH(node, pCondCols) { + SColumnNode* pColDef = (SColumnNode*)node; + SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId); + code = jsonAddColumnField(pColDef->colName, pColData, ri, fields); + QUERY_CHECK_CODE(code, lino, _end); + } + + // add trigger condition + cond = cJSON_CreateObject(); + QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0)); + JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields); + fields = NULL; + JSON_CHECK_ADD_ITEM(event, "triggerConditions", cond); + cond = NULL; + + // convert json object to string value + item.gid = pSessionKey->groupId; + item.skey = pSessionKey->win.skey; + item.isEnd = (eventType == SNOTIFY_EVENT_WINDOW_CLOSE); + item.content = cJSON_PrintUnformatted(event); + QUERY_CHECK_NULL(taosArrayPush(sup->pWindowEvents, &item), code, lino, _end, terrno); + item.content = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + destroyStreamWindowEvent(&item); + if (cond != NULL) { + cJSON_Delete(cond); + } + if (fields != NULL) { + cJSON_Delete(fields); + } + if (event != NULL) { + cJSON_Delete(event); + } + return code; +} + +int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper, + SStreamNotifyEventSupp* sup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SNode * node = NULL; + cJSON* event = NULL; + cJSON* result = NULL; + SStreamNotifyEvent item = {0}; + SColumnInfoData* pWstartCol = NULL; + + QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA); + + qDebug("add %" PRId64 " stream notify results from window agg", pResultBlock->info.rows); + + pWstartCol = taosArrayGet(pResultBlock->pDataBlock, 0); + for (int32_t i = 0; i< pResultBlock->info.rows; ++i) { + event = cJSON_CreateObject(); + QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + + // convert the result row into json + result = cJSON_CreateObject(); + QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) { + SSchema *pCol = pSchemaWrapper->pSchema + j; + SColumnInfoData *pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1); + code = jsonAddColumnField(pCol->name, pColData, i, result); + QUERY_CHECK_CODE(code, lino, _end); + } + JSON_CHECK_ADD_ITEM(event, "result", result); + result = NULL; + + item.gid = pResultBlock->info.id.groupId; + item.skey = *(uint64_t*)colDataGetNumData(pWstartCol, i); + item.content = cJSON_PrintUnformatted(event); + code = taosHashPut(sup->pResultHashMap, &item.gid, sizeof(item.gid) + sizeof(item.skey), &item, sizeof(item)); + TSDB_CHECK_CODE(code, lino, _end); + item.content = NULL; + + cJSON_Delete(event); + event = NULL; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + destroyStreamWindowEvent(&item); + if (result != NULL) { + cJSON_Delete(result); + } + if (event != NULL) { + cJSON_Delete(event); + } + return code; +} + +static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + const SStorageAPI* pAPI = NULL; + void* tbname = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + char parTbName[TSDB_TABLE_NAME_LEN]; + const SStreamTaskInfo* pStreamInfo = NULL; + + QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *pTableName = NULL; + + pAPI = &pTaskInfo->storageAPI; + code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode != TSDB_CODE_SUCCESS) { + parTbName[0] = '\0'; + } else { + tstrncpy(parTbName, tbname, sizeof(parTbName)); + } + pAPI->stateStore.streamStateFreeVal(tbname); + + pStreamInfo = &pTaskInfo->streamInfo; + code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t streamNotifyFillTableName(const char* tableName, const SStreamNotifyEvent* pEvent, + const SStreamNotifyEvent* pResult, char** pVal) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + static const char* prefix = "{\"tableName\":\""; + uint64_t prefixLen = 0; + uint64_t nameLen = 0; + uint64_t eventLen = 0; + uint64_t resultLen = 0; + uint64_t valLen = 0; + char* val = NULL; + char* p = NULL; + + QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pEvent, code, lino , _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pVal, code, lino , _end, TSDB_CODE_INVALID_PARA); + + *pVal = NULL; + prefixLen = strlen(prefix); + nameLen = strlen(tableName); + eventLen = strlen(pEvent->content); + + if (pResult != NULL) { + resultLen = strlen(pResult->content); + valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + resultLen; + } else { + valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + 1; + } + val = taosMemoryMalloc(valLen); + QUERY_CHECK_NULL(val, code, lino, _end, terrno); + varDataSetLen(val, valLen - VARSTR_HEADER_SIZE); + + p = varDataVal(val); + TAOS_STRNCPY(p, prefix, prefixLen); + p += prefixLen; + TAOS_STRNCPY(p, tableName, nameLen); + p += nameLen; + *(p++) = '\"'; + TAOS_STRNCPY(p, pEvent->content, eventLen); + *p = ','; + + if (pResult != NULL) { + p += eventLen - 1; + TAOS_STRNCPY(p, pResult->content, resultLen); + *p = ','; + } + *pVal = val; + val = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (val != NULL) { + taosMemoryFreeClear(val); + } + return code; +} + +int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SColumnInfoData* pEventStrCol = NULL; + int32_t nWindowEvents = 0; + int32_t nWindowResults = 0; + char* val = NULL; + + if (pTaskInfo == NULL || sup == NULL) { + goto _end; + } + + QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + blockDataCleanup(sup->pEventBlock); + nWindowEvents = taosArrayGetSize(sup->pWindowEvents); + nWindowResults = taosHashGetSize(sup->pResultHashMap); + qDebug("start to build stream notify event block, nWindowEvents: %d, nWindowResults: %d", nWindowEvents, + nWindowResults); + if (nWindowEvents == 0) { + goto _end; + } + + code = blockDataEnsureCapacity(sup->pEventBlock, nWindowEvents); + QUERY_CHECK_CODE(code, lino, _end); + + pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX); + QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno); + + for (int32_t i = 0; i < nWindowEvents; ++i) { + SStreamNotifyEvent* pResult = NULL; + SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowEvents, i); + char* tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid)); + if (tableName == NULL) { + code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName); + QUERY_CHECK_CODE(code, lino, _end); + code = taosHashPut(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1); + taosMemoryFreeClear(tableName); + QUERY_CHECK_CODE(code, lino, _end); + tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid)); + QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + } + if (pEvent->isEnd) { + pResult = taosHashGet(sup->pResultHashMap, &pEvent->gid, sizeof(pEvent->gid) + sizeof(pEvent->skey)); + QUERY_CHECK_NULL(pResult, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + } + code = streamNotifyFillTableName(tableName, pEvent, pResult, &val); + QUERY_CHECK_CODE(code, lino, _end); + code = colDataSetVal(pEventStrCol, i, val, false); + QUERY_CHECK_CODE(code, lino, _end); + taosMemoryFreeClear(val); + sup->pEventBlock->info.rows++; + } + + if (taosHashGetMemSize(sup->pTableNameHashMap) >= NOTIFY_EVENT_NAME_CACHE_LIMIT_MB * 1024 * 1024) { + taosHashClear(sup->pTableNameHashMap); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (val != NULL) { + taosMemoryFreeClear(val); + } + if (sup != NULL) { + taosArrayClearEx(sup->pWindowEvents, destroyStreamWindowEvent); + taosHashClear(sup->pResultHashMap); + } + return code; } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index d038e4d82c85..44799f193b6c 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -55,6 +55,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) { pInfo->pOperator = NULL; } + destroyStreamBasicInfo(&pInfo->basic); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; @@ -651,7 +652,8 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState); - initStreamBasicInfo(&pInfo->basic); + code = initStreamBasicInfo(&pInfo->basic); + QUERY_CHECK_CODE(code, lino, _error); if (downstream) { code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 44004a4c6bc0..4fe8efe39760 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -150,6 +150,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } + destroyStreamBasicInfo(&pInfo->basic); colDataDestroy(&pInfo->twAggSup.timeWindowData); destroyStreamAggSupporter(&pInfo->streamAggSup); resetPrevAndNextWindow(pInfo->pFillSup); @@ -2201,7 +2202,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); - initStreamBasicInfo(&pInfo->basic); + code = initStreamBasicInfo(&pInfo->basic); + QUERY_CHECK_CODE(code, lino, _error); if (downstream) { code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index bea9b9621574..bfe86aa2ac61 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -99,6 +99,8 @@ const char* nodesNodeName(ENodeType type) { return "CountWindow"; case QUERY_NODE_ANOMALY_WINDOW: return "AnomalyWindow"; + case QUERY_NODE_STREAM_NOTIFY_OPTIONS: + return "StreamNotifyOptions"; case QUERY_NODE_SET_OPERATOR: return "SetOperator"; case QUERY_NODE_SELECT_STMT: @@ -5812,6 +5814,45 @@ static int32_t jsonToStreamOptions(const SJson* pJson, void* pObj) { return code; } +static const char* jkStreamNotifyOptionsAddrUrls = "AddrUrls"; +static const char* jkStreamNotifyOptionsEventType = "EventType"; +static const char* jkStreamNotifyOptionsErrorHandle = "ErrorHandle"; +static const char* jkStreamNotifyOptionsNotifyHistory = "NotifyHistory"; + +static int32_t streamNotifyOptionsToJson(const void* pObj, SJson* pJson) { + const SStreamNotifyOptions* pNotifyOption = (const SStreamNotifyOptions*)pObj; + int32_t code = nodeListToJson(pJson, jkStreamNotifyOptionsAddrUrls, pNotifyOption->pAddrUrls); + if (code == TSDB_CODE_SUCCESS) { + code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsEventType, pNotifyOption->eventTypes); + } + if (code == TSDB_CODE_SUCCESS) { + code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsErrorHandle, pNotifyOption->errorHandle); + } + if (code == TSDB_CODE_SUCCESS) { + code = tjsonAddBoolToObject(pJson, jkStreamNotifyOptionsNotifyHistory, pNotifyOption->notifyHistory); + } + + return code; +} + +static int32_t jsonToStreamNotifyOptions(const SJson* pJson, void* pObj) { + SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pObj; + int32_t code = jsonToNodeList(pJson, jkStreamNotifyOptionsAddrUrls, &pNotifyOption->pAddrUrls); + int32_t val = 0; + if (code == TSDB_CODE_SUCCESS) { + code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsEventType, &val); + pNotifyOption->eventTypes = val; + } + if (code == TSDB_CODE_SUCCESS) { + code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsErrorHandle, &val); + pNotifyOption->errorHandle = val; + } + if (code == TSDB_CODE_SUCCESS) { + code = tjsonGetBoolValue(pJson, jkStreamNotifyOptionsNotifyHistory, &pNotifyOption->notifyHistory); + } + return code; +} + static const char* jkWhenThenWhen = "When"; static const char* jkWhenThenThen = "Then"; @@ -7207,6 +7248,7 @@ static const char* jkCreateStreamStmtOptions = "Options"; static const char* jkCreateStreamStmtQuery = "Query"; static const char* jkCreateStreamStmtTags = "Tags"; static const char* jkCreateStreamStmtSubtable = "Subtable"; +static const char* jkCreateStreamStmtNotifyOptions = "NotifyOptions"; static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) { const SCreateStreamStmt* pNode = (const SCreateStreamStmt*)pObj; @@ -7233,6 +7275,9 @@ static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkCreateStreamStmtSubtable, nodeToJson, pNode->pSubtable); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkCreateStreamStmtNotifyOptions, nodeToJson, pNode->pNotifyOptions); + } return code; } @@ -7262,6 +7307,9 @@ static int32_t jsonToCreateStreamStmt(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkCreateStreamStmtSubtable, &pNode->pSubtable); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkCreateStreamStmtNotifyOptions, (SNode**)&pNode->pNotifyOptions); + } return code; } @@ -8029,6 +8077,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return countWindowNodeToJson(pObj, pJson); case QUERY_NODE_ANOMALY_WINDOW: return anomalyWindowNodeToJson(pObj, pJson); + case QUERY_NODE_STREAM_NOTIFY_OPTIONS: + return streamNotifyOptionsToJson(pObj, pJson); case QUERY_NODE_SET_OPERATOR: return setOperatorToJson(pObj, pJson); case QUERY_NODE_SELECT_STMT: @@ -8402,6 +8452,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToCountWindowNode(pJson, pObj); case QUERY_NODE_ANOMALY_WINDOW: return jsonToAnomalyWindowNode(pJson, pObj); + case QUERY_NODE_STREAM_NOTIFY_OPTIONS: + return jsonToStreamNotifyOptions(pJson, pObj); case QUERY_NODE_SET_OPERATOR: return jsonToSetOperator(pJson, pObj); case QUERY_NODE_SELECT_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 3d4df385f7cc..ae5b302d2da0 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -467,6 +467,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_WINDOW_OFFSET: code = makeNode(type, sizeof(SWindowOffsetNode), &pNode); break; + case QUERY_NODE_STREAM_NOTIFY_OPTIONS: + code = makeNode(type, sizeof(SStreamNotifyOptions), &pNode); + break; case QUERY_NODE_SET_OPERATOR: code = makeNode(type, sizeof(SSetOperator), &pNode); break; @@ -1267,6 +1270,11 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pAround->pTimepoint); break; } + case QUERY_NODE_STREAM_NOTIFY_OPTIONS: { + SStreamNotifyOptions* pNotifyOptions = (SStreamNotifyOptions*)pNode; + nodesDestroyList(pNotifyOptions->pAddrUrls); + break; + } case QUERY_NODE_SET_OPERATOR: { SSetOperator* pStmt = (SSetOperator*)pNode; nodesDestroyList(pStmt->pProjectionList); @@ -1479,6 +1487,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pStmt->pQuery); nodesDestroyList(pStmt->pTags); nodesDestroyNode(pStmt->pSubtable); + nodesDestroyNode((SNode*)pStmt->pNotifyOptions); tFreeSCMCreateStreamReq(pStmt->pReq); taosMemoryFreeClear(pStmt->pReq); break; diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index dc9986ad04c8..387bccf358fb 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -296,8 +296,12 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con SNode* createStreamOptions(SAstCreateContext* pCxt); SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken, SNode* pNode); +SNode* createStreamNotifyOptions(SAstCreateContext *pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes); +SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag, + SToken* pToken); SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, - SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols); + SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols, + SNode* pNotifyOptions); SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 7f383afe4884..439af13d7130 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -785,7 +785,7 @@ full_view_name(A) ::= db_name(B) NK_DOT view_name(C). /************************************************ create/drop stream **************************************************/ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G) - AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); } + AS query_or_subquery(D) notify_opt(I). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H, I); } cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); } cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); } @@ -832,6 +832,26 @@ subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. ignore_opt(A) ::= . { A = false; } ignore_opt(A) ::= IGNORE UNTREATED. { A = true; } +notify_opt(A) ::= . { A = NULL; } +notify_opt(A) ::= notify_def(B). { A = B; } + +notify_def(A) ::= NOTIFY NK_LP url_def_list(B) NK_RP ON NK_LP event_def_list(C) NK_RP. { A = createStreamNotifyOptions(pCxt, B, C); } +notify_def(A) ::= notify_def(B) ON_FAILURE DROP(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); } +notify_def(A) ::= notify_def(B) ON_FAILURE PAUSE(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); } +notify_def(A) ::= notify_def(B) NOTIFY_HISTORY NK_INTEGER(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_NOTIFY_HISTORY_SET, &C); } + +%type url_def_list { SNodeList* } +%destructor url_def_list { nodesDestroyList($$); } +url_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); } +url_def_list(A) ::= url_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); } + +%type event_def_list { SNodeList* } +%destructor event_def_list { nodesDestroyList($$); } +event_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); } +event_def_list(A) ::= event_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); } + + + /************************************************ kill connection/query ***********************************************/ cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); } cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 5b90fd601ebd..c875cbad0581 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1526,8 +1526,8 @@ SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhe pCaseWhen->pCase = pCase; pCaseWhen->pWhenThenList = pWhenThenList; pCaseWhen->pElse = pElse; - pCaseWhen->tz = pCxt->pQueryCxt->timezone; - pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt; + pCaseWhen->tz = pCxt->pQueryCxt->timezone; + pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt; return (SNode*)pCaseWhen; _err: nodesDestroyNode(pCase); @@ -3657,8 +3657,115 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions return pOptions; } +static bool validateNotifyUrl(const char* url) { + const char* prefix[] = {"http://", "https://", "ws://", "wss://"}; + const char* host = NULL; + + if (!url || *url == '\0') return false; + + for (int32_t i = 0; i < ARRAY_SIZE(prefix); ++i) { + if (strncasecmp(url, prefix[i], strlen(prefix[i])) == 0) { + host = url + strlen(prefix[i]); + break; + } + } + + return (host != NULL) && (*host != '\0') && (*host != '/'); +} + +SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes) { + SNode* pNode = NULL; + EStreamNotifyEventType eventTypes = 0; + const char* eWindowOpenStr = "WINDOW_OPEN"; + const char* eWindowCloseStr = "WINDOW_CLOSE"; + + CHECK_PARSER_STATUS(pCxt); + + if (LIST_LENGTH(pAddrUrls) == 0) { + pCxt->errCode = + generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "notification address cannot be empty"); + goto _err; + } + + FOREACH(pNode, pAddrUrls) { + char *url = ((SValueNode*)pNode)->literal; + if (strlen(url) >= TSDB_STREAM_NOTIFY_URL_LEN) { + pCxt->errCode = + generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "notification address \"%s\" exceed maximum length %d", url, TSDB_STREAM_NOTIFY_URL_LEN); + goto _err; + } + if (!validateNotifyUrl(url)) { + pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "invalid notification address \"%s\"", url); + goto _err; + } + } + + if (LIST_LENGTH(pEventTypes) == 0) { + pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "event types must be specified for notification"); + goto _err; + } + + FOREACH(pNode, pEventTypes) { + char *eventStr = ((SValueNode *)pNode)->literal; + if (strncasecmp(eventStr, eWindowOpenStr, strlen(eWindowOpenStr) + 1) == 0) { + BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_OPEN); + } else if (strncasecmp(eventStr, eWindowCloseStr, strlen(eWindowCloseStr) + 1) == 0) { + BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); + } else { + pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "invalid event type '%s' for notification", eventStr); + goto _err; + } + } + + SStreamNotifyOptions* pNotifyOptions = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_STREAM_NOTIFY_OPTIONS, (SNode**)&pNotifyOptions); + CHECK_MAKE_NODE(pNotifyOptions); + pNotifyOptions->pAddrUrls = pAddrUrls; + pNotifyOptions->eventTypes = eventTypes; + pNotifyOptions->errorHandle = SNOTIFY_ERROR_HANDLE_PAUSE; + pNotifyOptions->notifyHistory = false; + nodesDestroyList(pEventTypes); + return (SNode*)pNotifyOptions; +_err: + nodesDestroyList(pAddrUrls); + nodesDestroyList(pEventTypes); + return NULL; +} + +SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag, + SToken* pToken) { + CHECK_PARSER_STATUS(pCxt); + + SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pNode; + if (BIT_FLAG_TEST_MASK(pNotifyOption->setFlag, setFlag)) { + pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "stream notify options each item can only be set once"); + goto _err; + } + switch (setFlag) { + case SNOTIFY_OPT_ERROR_HANDLE_SET: + pNotifyOption->errorHandle = (pToken->type == TK_DROP) ? SNOTIFY_ERROR_HANDLE_DROP : SNOTIFY_ERROR_HANDLE_PAUSE; + break; + case SNOTIFY_OPT_NOTIFY_HISTORY_SET: + pNotifyOption->notifyHistory = taosStr2Int8(pToken->z, NULL, 10); + break; + default: + break; + } + BIT_FLAG_SET_MASK(pNotifyOption->setFlag, setFlag); + return pNode; +_err: + nodesDestroyNode(pNode); + return NULL; +} + SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, - SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) { + SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols, + SNode* pNotifyOptions) { CHECK_PARSER_STATUS(pCxt); CHECK_NAME(checkStreamName(pCxt, pStreamName)); SCreateStreamStmt* pStmt = NULL; @@ -3674,6 +3781,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken pStmt->pTags = pTags; pStmt->pSubtable = pSubtable; pStmt->pCols = pCols; + pStmt->pNotifyOptions = (SStreamNotifyOptions*)pNotifyOptions; return (SNode*)pStmt; _err: nodesDestroyNode(pRealTable); @@ -3682,6 +3790,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken nodesDestroyList(pTags); nodesDestroyNode(pSubtable); nodesDestroyList(pCols); + nodesDestroyNode(pNotifyOptions); return NULL; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index ea2e9d712fa0..7ed438a7dc81 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -355,6 +355,9 @@ static SKeyword keywordTable[] = { {"FORCE_WINDOW_CLOSE", TK_FORCE_WINDOW_CLOSE}, {"DISK_INFO", TK_DISK_INFO}, {"AUTO", TK_AUTO}, + {"NOTIFY", TK_NOTIFY}, + {"ON_FAILURE", TK_ON_FAILURE}, + {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY}, }; // clang-format on diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1d87b83e62c2..74dd1be6147f 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12192,6 +12192,45 @@ static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt return TSDB_CODE_SUCCESS; } +static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOptions* pNotifyOptions, + SCMCreateStreamReq* pReq) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + + if (pNotifyOptions == NULL || pNotifyOptions->pAddrUrls->length == 0) { + return code; + } + + pReq->pNotifyAddrUrls = taosArrayInit(pNotifyOptions->pAddrUrls->length, POINTER_BYTES); + if (pReq->pNotifyAddrUrls != NULL) { + FOREACH(pNode, pNotifyOptions->pAddrUrls) { + char *url = taosStrndup(((SValueNode*)pNode)->literal, TSDB_STREAM_NOTIFY_URL_LEN); + if (url == NULL) { + code = terrno; + break; + } + if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) { + code = terrno; + taosMemoryFreeClear(url); + break; + } + } + } else { + code = terrno; + } + + if (code == TSDB_CODE_SUCCESS) { + pReq->notifyEventTypes = pNotifyOptions->eventTypes; + pReq->notifyErrorHandle = pNotifyOptions->errorHandle; + pReq->notifyHistory = pNotifyOptions->notifyHistory; + } else { + taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL); + pReq->pNotifyAddrUrls = NULL; + } + + return code; +} + static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { pReq->igExists = pStmt->ignoreExists; @@ -12238,6 +12277,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* } } + if (TSDB_CODE_SUCCESS == code) { + code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq); + } + return code; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42d7f44b6243..baf36d04531a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -735,7 +735,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) { if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { code = buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId, sizeof(pDataBlock->info.parTbName)); - } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER) { code = buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId, sizeof(pDataBlock->info.parTbName)); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0de256d86dd2..dde7b197c435 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -198,6 +198,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { + tDecoderClear(&decoder); continue; } @@ -1031,6 +1032,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { + tDecoderClear(&decoder); continue; } tDecoderClear(&decoder); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d27ed520c6b6..5ee8bd43f584 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -326,6 +326,11 @@ void tFreeStreamTask(void* pParam) { streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo); pTask->chkInfo.pActiveInfo = NULL; + taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL); + taosMemoryFreeClear(pTask->notifyInfo.streamName); + taosMemoryFreeClear(pTask->notifyInfo.stbFullName); + tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper); + taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); } @@ -1318,6 +1323,78 @@ void streamTaskFreeRefId(int64_t* pRefId) { metaRefMgtRemove(pRefId); } +static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA); + + int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize)); + for (int32_t i = 0; i < addrSize; ++i) { + const char* url = taosArrayGetP(info->pNotifyAddrUrls, i); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url)); + } + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle)); + if (addrSize > 0) { + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper)); + } + +_exit: + if (code != TSDB_CODE_SUCCESS) { + stError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA); + + int32_t addrSize = 0; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize)); + info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES); + QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno); + for (int32_t i = 0; i < addrSize; ++i) { + char *url = NULL; + TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url)); + url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN); + QUERY_CHECK_NULL(url, code, lino, _exit, terrno); + if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) { + taosMemoryFree(url); + TAOS_CHECK_EXIT(terrno); + } + } + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle)); + if (addrSize > 0) { + char* name = NULL; + TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name)); + info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1); + QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno); + TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name)); + info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1); + QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno); + info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (info->pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper)); + } + +_exit: + if (code != TSDB_CODE_SUCCESS) { + stError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t code = 0; @@ -1388,6 +1465,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); + if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) { + TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo)); + } + tEndEncode(pEncoder); _exit: return code; @@ -1486,8 +1567,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); + if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) { + TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo)); + } + tEndDecode(pDecoder); _exit: return code; -} \ No newline at end of file +} diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 4f5ca8d789b3..03ef00a0c09e 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -1490,3 +1490,32 @@ bool taosAssertRelease(bool condition) { return true; } #endif + +char* u64toaFastLut(uint64_t val, char* buf) { + static const char* lut = + "0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455" + "5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899"; + + char temp[24]; + char* p = temp; + + while (val >= 100) { + strncpy(p, lut + (val % 100) * 2, 2); + val /= 100; + p += 2; + } + + if (val >= 10) { + strncpy(p, lut + val * 2, 2); + p += 2; + } else if (val > 0 || p == temp) { + *(p++) = val + '0'; + } + + while (p != temp) { + *buf++ = *--p; + } + + *buf = '\0'; + return buf; +}