Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream)[TS-5469]. add support for window event notifications in stream processing #29542

Open
wants to merge 4 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
STREAM_NOTIFY_EVENT,
} EStreamType;

#pragma pack(push, 1)
Expand Down Expand Up @@ -404,6 +405,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2

// stream notify event block column
#define NOTIFY_EVENT_STR_COLUMN_INDEX 0

int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);

Expand Down
2 changes: 2 additions & 0 deletions include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName);

int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);

Expand Down
6 changes: 6 additions & 0 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
QUERY_NODE_STREAM_NOTIFY_OPTIONS,

// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
Expand Down Expand Up @@ -2956,6 +2957,11 @@ typedef struct {
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
// 3.3.5.0
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
int8_t notifyHistory;
} SCMCreateStreamReq;

typedef struct {
Expand Down
3 changes: 3 additions & 0 deletions include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);

int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule);

/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
Expand Down
49 changes: 37 additions & 12 deletions include/libs/nodes/cmdnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,44 @@ typedef struct SStreamOptions {
int64_t setFlag;
} SStreamOptions;

typedef enum EStreamNotifyOptionSetFlag {
SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0),
SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1),
} EStreamNotifyOptionSetFlag;

typedef enum EStreamNotifyEventType {
SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0),
SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1),
} EStreamNotifyEventType;

typedef enum EStreamNotifyErrorHandleType {
SNOTIFY_ERROR_HANDLE_PAUSE,
SNOTIFY_ERROR_HANDLE_DROP,
} EStreamNotifyErrorHandleType;

typedef struct SStreamNotifyOptions {
ENodeType type;
SNodeList* pAddrUrls;
EStreamNotifyEventType eventTypes;
EStreamNotifyErrorHandleType errorHandle;
bool notifyHistory;
EStreamNotifyOptionSetFlag setFlag;
} SStreamNotifyOptions;

typedef struct SCreateStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SCMCreateStreamReq* pReq;
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SStreamNotifyOptions* pNotifyOptions;
SCMCreateStreamReq* pReq;
} SCreateStreamStmt;

typedef struct SDropStreamStmt {
Expand Down
20 changes: 16 additions & 4 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;

#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
#define SSTREAM_TASK_VER 5
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId
#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close

#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))

extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
Expand Down Expand Up @@ -427,6 +431,13 @@ typedef struct STaskCheckInfo {
TdThreadMutex checkInfoLock;
} STaskCheckInfo;

typedef struct SNotifyInfo {
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
char* streamName;
} SNotifyInfo;

struct SStreamTask {
int64_t ver;
SStreamTaskId id;
Expand All @@ -449,6 +460,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
SNotifyInfo notifyInfo;

// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
Expand Down
1 change: 1 addition & 0 deletions include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ typedef enum ELogicConditionType {
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0'
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
Expand Down
35 changes: 35 additions & 0 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}

TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId));

int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls);
TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize));
for (int32_t i = 0; i < addrSize; ++i) {
const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url)));
}
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory));
tEndEncode(&encoder);

_exit:
Expand Down Expand Up @@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId));
}

if (!tDecodeIsEnd(&decoder)) {
int32_t addrSize = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize));
pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
if (pReq->pNotifyAddrUrls == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < addrSize; ++i) {
char *url = NULL;
TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url));
url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
if (url == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
taosMemoryFree(url);
TAOS_CHECK_EXIT(terrno);
}
}
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory));
}

tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
Expand Down Expand Up @@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
}

int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
Expand Down
27 changes: 27 additions & 0 deletions source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}

int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

if (parTbName[0]) {
if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
stbFullName) {
*dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);

tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
} else {
*dstTableName = taosStrdup(parTbName);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
}
} else {
code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
TSDB_CHECK_CODE(code, lino, _end);
}

_end:
return code;
}

// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
Expand Down
74 changes: 74 additions & 0 deletions source/dnode/mnode/impl/src/mndStream.c
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,72 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
return TSDB_CODE_SUCCESS;
}

static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }

static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, SStreamTask *pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);

pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
pTask->notifyInfo.streamName = taosStrdup(createReq->name);
TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);

_end:
if (code != TSDB_CODE_SUCCESS) {
mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t level = 0;
int32_t nTasks = 0;
SArray *pLevel = NULL;

TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);

if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
goto _end;
}

level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; ++i) {
pLevel = taosArrayGetP(pStream->tasks, i);
nTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < nTasks; ++j) {
code = addStreamTaskNotifyInfo(createReq, taosArrayGetP(pLevel, j));
TSDB_CHECK_CODE(code, lino, _end);
}
}

if (pStream->conf.fillHistory && createReq->notifyHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; ++i) {
pLevel = taosArrayGetP(pStream->pHTasksList, i);
nTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < nTasks; ++j) {
code = addStreamTaskNotifyInfo(createReq, taosArrayGetP(pLevel, j));
TSDB_CHECK_CODE(code, lino, _end);
}
}
}

_end:
if (code != TSDB_CODE_SUCCESS) {
mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
}
return code;
}

static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
Expand Down Expand Up @@ -850,6 +916,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}

// add notify info into all stream tasks
code = addStreamNotifyInfo(&createReq, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}

// add stream to trans
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
Expand Down
1 change: 1 addition & 0 deletions source/dnode/vnode/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ set(
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
"src/tq/tqStreamNotify.c"
)

aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES)
Expand Down
5 changes: 5 additions & 0 deletions source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);

// tq send notifications
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode);

#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \
Expand Down
5 changes: 5 additions & 0 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;

typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap;

#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"

Expand Down Expand Up @@ -496,6 +498,9 @@ struct SVnode {
int64_t blockSeq;
SQHandle* pQuery;
SVMonitorObj monitor;

// Notification Handles
SStreamNotifyHandleMap* pNotifyHandleMap;
};

#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
Expand Down
Loading
Loading