diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java index 2ccae832fa2..b2ea76496be 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/DataNodeType.java @@ -23,4 +23,5 @@ public class DataNodeType { public static final String PULSAR = "PULSAR"; public static final String CLS = "CLS"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String HTTP = "HTTP"; } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java index cd91b19464f..c2816baf1e7 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java @@ -24,5 +24,6 @@ public class SinkType { public static final String CLS = "CLS"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; public static final String STARROCKS = "STARROCKS"; + public static final String HTTP = "HTTP"; } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/HttpSinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/HttpSinkConfig.java new file mode 100644 index 00000000000..009843c40c3 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/HttpSinkConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.sink; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Map; + +@EqualsAndHashCode(callSuper = true) +@Data +public class HttpSinkConfig extends SinkConfig { + + private String path; + private String method; + private Map headers; + private Integer maxRetryTimes; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java index 867d56dad6e..f50a3683b13 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java @@ -34,6 +34,7 @@ @JsonSubTypes.Type(value = EsSinkConfig.class, name = SinkType.ELASTICSEARCH), @JsonSubTypes.Type(value = PulsarSinkConfig.class, name = SinkType.PULSAR), @JsonSubTypes.Type(value = KafkaSinkConfig.class, name = SinkType.KAFKA), + @JsonSubTypes.Type(value = HttpSinkConfig.class, name = SinkType.HTTP), }) public abstract class SinkConfig implements Serializable { diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/HttpNodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/HttpNodeConfig.java new file mode 100644 index 00000000000..caf39f29e43 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/HttpNodeConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.node; + +import lombok.Data; + +@Data +public class HttpNodeConfig extends NodeConfig { + + private String baseUrl; + private Boolean enableCredential; + private String username; + private String password; + private Integer maxConnect; +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java index ce0248e56d5..7f9a5167478 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java @@ -33,6 +33,7 @@ @JsonSubTypes.Type(value = EsNodeConfig.class, name = DataNodeType.ELASTICSEARCH), @JsonSubTypes.Type(value = PulsarNodeConfig.class, name = DataNodeType.PULSAR), @JsonSubTypes.Type(value = KafkaNodeConfig.class, name = DataNodeType.KAFKA), + @JsonSubTypes.Type(value = HttpNodeConfig.class, name = DataNodeType.HTTP), }) public abstract class NodeConfig implements Serializable { diff --git a/inlong-dashboard/src/ui/locales/cn.json b/inlong-dashboard/src/ui/locales/cn.json index 2c00ea1dc09..379a5a83e1a 100644 --- a/inlong-dashboard/src/ui/locales/cn.json +++ b/inlong-dashboard/src/ui/locales/cn.json @@ -703,6 +703,9 @@ "pages.GroupDetail.Delay.AverageTitle": "平均传输时延 (ms)", "pages.GroupDetail.Delay.RealTimeTitle": "传输时延 (ms)", "pages.GroupDetail.Stream.Preview": "数据预览", + "pages.GroupDetail.Stream.Original": "原文", + "pages.GroupDetail.Stream.ShowOriginal": "显示原文", + "pages.GroupDetail.Stream.Closed": "关闭", "pages.GroupDetail.Stream.Dt": "数据时间 (dt)", "pages.GroupDetail.Stream.Content": "数据内容", "pages.GroupDetail.Stream.Number": "数据条数", diff --git a/inlong-dashboard/src/ui/locales/en.json b/inlong-dashboard/src/ui/locales/en.json index 7546f2e3993..bc1106fdd6f 100644 --- a/inlong-dashboard/src/ui/locales/en.json +++ b/inlong-dashboard/src/ui/locales/en.json @@ -703,6 +703,9 @@ "pages.GroupDetail.Delay.AverageTitle": "Average transmission delay (ms)", "pages.GroupDetail.Delay.RealTimeTitle": "Transmission delay (ms)", "pages.GroupDetail.Stream.Preview": "Data preview", + "pages.GroupDetail.Stream.Original": "Original", + "pages.GroupDetail.Stream.ShowOriginal": "Show Original", + "pages.GroupDetail.Stream.Closed": "Close", "pages.GroupDetail.Stream.Dt": "Data time(dt)", "pages.GroupDetail.Stream.Content": "Data content", "pages.GroupDetail.Stream.Number": "Records number", diff --git a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx index 74819922456..a608dbcb7ef 100644 --- a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx +++ b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx @@ -18,12 +18,12 @@ */ import React, { useState } from 'react'; -import { Modal, Table, Radio, RadioChangeEvent } from 'antd'; +import { Modal, Table, Radio, RadioChangeEvent, Button, Card } from 'antd'; import { ModalProps } from 'antd/es/modal'; import { useRequest, useUpdateEffect } from '@/ui/hooks'; import i18n from '@/i18n'; import { ColumnsType } from 'antd/es/table'; -import { timestampFormat } from '@/core/utils'; +import dayjs from 'dayjs'; export interface Props extends ModalProps { inlongGroupId: string; @@ -33,29 +33,18 @@ export interface Props extends ModalProps { const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps }) => { const [position, setPosition] = useState(1); + + const [originalModal, setOriginalModal] = useState({ + open: false, + record: {}, + }); interface DataType { id: React.Key; - dt: string; - body: string; } - const detailColumns: ColumnsType = [ - { - title: 'ID', - dataIndex: 'id', - }, - { - title: i18n.t('pages.GroupDetail.Stream.Dt'), - dataIndex: 'dt', - render: text => text && timestampFormat(text), - }, - { - title: i18n.t('pages.GroupDetail.Stream.Content'), - dataIndex: 'body', - ellipsis: true, - render: text => {text}, - }, - ]; + const onShowOriginModal = record => { + setOriginalModal({ open: true, record: record }); + }; const { data: previewData, run: getPreviewData } = useRequest( { @@ -72,10 +61,80 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps }) }, ); + const getColumn = () => { + let exitsId = false; + const result = previewData?.[0]?.fieldList?.reduce((acc, cur) => { + if (cur['fieldName'] === 'id') { + exitsId = true; + } + const width = + (cur['fieldName'].length > cur['fieldValue'].length + ? cur['fieldName'].length + : cur['fieldValue'].length) * 10; + acc.push({ + title: cur['fieldName'], + key: cur['fieldName'], + dataIndex: cur['fieldName'], + width: width >= 100 ? width : 100, + }); + return acc; + }, []); + if (result) { + if (exitsId) { + return result; + } + return [{ title: 'id', key: 'id', dataIndex: 'id', width: 100 }].concat([...result]); + } + return; + }; + + const detailColumns: ColumnsType = [ + { + title: i18n.t('pages.GroupDetail.Stream.Dt'), + key: 'dt', + width: 200, + dataIndex: 'dt', + }, + ].concat( + (getColumn() ? getColumn() : []).concat([ + { + title: 'operation', + key: 'operation', + fixed: 'right', + width: 100, + render: (text, record: any) => ( + <> + + + ), + }, + ]), + ); + const convertListToMap = () => { + const result = []; + for (let i = 0; i < previewData?.length; i++) { + const temp = previewData?.[i]?.fieldList.reduce((acc, item) => { + acc[item.fieldName] = item.fieldValue; + return acc; + }, {}); + temp['id'] = temp['id'] ? temp['id'] : i; + temp['headers'] = previewData?.[i]?.headers; + temp['body'] = previewData?.[i]?.body; + temp['dt'] = dayjs(previewData?.[i]?.dt).format('YYYY-MM-DD HH:mm:ss'); + result.push(temp); + } + return result; + }; const onChange = ({ target: { value } }: RadioChangeEvent) => { setPosition(value); }; - useUpdateEffect(() => { if (modalProps.open) { if (inlongStreamId) { @@ -102,14 +161,31 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps })

{record.body}

, - rowExpandable: record => record.id !== 'Not Expandable', - expandRowByClick: true, - }} >
+ + setOriginalModal(prev => ({ ...prev, open: false }))} + footer={[ + , + ]} + > +
+ +

{JSON.stringify(originalModal?.record['headers'])}

+
+ +

{originalModal?.record['body']}

+
+
+
); }; diff --git a/inlong-dashboard/src/ui/pages/GroupDetail/ResourceInfo/index.tsx b/inlong-dashboard/src/ui/pages/GroupDetail/ResourceInfo/index.tsx index 83d69dd788b..57fb9a0fc8f 100644 --- a/inlong-dashboard/src/ui/pages/GroupDetail/ResourceInfo/index.tsx +++ b/inlong-dashboard/src/ui/pages/GroupDetail/ResourceInfo/index.tsx @@ -37,6 +37,13 @@ const Comp = ({ inlongGroupId, isCreate }: Props, ref) => { return !!inlongGroupId; }, [inlongGroupId]); + const [sortOption, setSortOption] = useState({ + inlongGroupId: inlongGroupId, + inlongStreamId: '', + pageSize: 5, + pageNum: 1, + }); + const { data, run: getData } = useRequest(`/group/detail/${inlongGroupId}`, { ready: isUpdate, refreshDeps: [inlongGroupId], @@ -45,6 +52,20 @@ const Comp = ({ inlongGroupId, isCreate }: Props, ref) => { }), }); + const { data: sortData, run: getSortData } = useRequest( + { + url: '/sink/listDetail', + method: 'post', + data: sortOption, + }, + { + refreshDeps: [sortOption], + formatResult: data => ({ + ...data, + }), + }, + ); + useEffect(() => { getData(); }, [getData]); @@ -82,7 +103,6 @@ const Comp = ({ inlongGroupId, isCreate }: Props, ref) => { for (const item in data) { if ( data[item] !== null && - item !== 'SortInfo' && item !== 'PULSAR' && item !== 'TUBEMQ' && item !== 'inlongClusterTag' @@ -92,31 +112,24 @@ const Comp = ({ inlongGroupId, isCreate }: Props, ref) => { } return info; }; - - const [current, setCurrent] = useState(1); - const [options, setOptions] = useState({ - streamId: '', - }); + const onChange = ({ current: pageNum, pageSize }) => { + setSortOption(pre => ({ ...pre, pageNum: pageNum, pageSize: pageSize })); + }; const pagination = { pageSize: 5, - current: current, - total: - options.streamId !== '' - ? data?.SortInfo.filter(item => item.inlongStreamId.includes(options.streamId)).length - : data?.SortInfo?.length, - }; - const onChange = ({ current: pageNum, pageSize }) => { - setCurrent(pageNum); + current: sortOption.pageNum, + total: sortData?.total, }; const onFilter = allValues => { - setOptions({ - streamId: allValues.streamId, - }); + setSortOption(pre => ({ + ...pre, + inlongStreamId: allValues.streamId, + })); }; - const content = defaultValues => [ + const content = () => [ { type: 'inputsearch', label: 'Stream Id', @@ -192,35 +205,30 @@ const Comp = ({ inlongGroupId, isCreate }: Props, ref) => { > )} - {data?.hasOwnProperty('SortInfo') && ( - <> - - Sort {t('pages.GroupDetail.Resource.Info')} - - item.inlongStreamId.includes(options.streamId)) - : data?.SortInfo, - pagination, - rowKey: 'name', - onChange, - }} - /> - - )} + <> + + Sort {t('pages.GroupDetail.Resource.Info')} + + + ); }; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java index 17ab8663619..76e63e632af 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java @@ -18,16 +18,20 @@ package org.apache.inlong.manager.service.consume; import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo; +import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.commons.lang3.StringUtils; @@ -36,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; import java.util.Objects; /** @@ -48,6 +53,8 @@ public class ConsumeTubeMQOperator extends AbstractConsumeOperator { @Autowired private InlongGroupService groupService; + @Autowired + private InlongClusterService clusterService; @Override public Boolean accept(String mqType) { @@ -81,7 +88,12 @@ public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) { ConsumeTubeMQDTO dto = ConsumeTubeMQDTO.getFromJson(entity.getExtParams()); CommonBeanUtils.copyProperties(dto, consumeInfo); } - + InlongGroupInfo groupInfo = groupService.get(entity.getInlongGroupId()); + List clusterInfos = + clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.TUBEMQ); + Preconditions.expectNotEmpty(clusterInfos, + "tubeMQ cluster not exist for groupId=" + groupInfo.getInlongGroupId()); + consumeInfo.setClusterInfos(clusterInfos); return consumeInfo; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h new file mode 100644 index 00000000000..8f79c3b8029 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef INLONG_SDK_API_H +#define INLONG_SDK_API_H + +#include +#include +#include +#include +#include + +namespace inlong { + +typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, + const int64_t, const char *); + +class ApiImp; + +class InLongApi { + public: + InLongApi(); + ~InLongApi(); + int32_t InitApi(const char *config_path); + + int32_t AddInLongGroupId(const std::vector &group_ids); + + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back = nullptr); + + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back = nullptr); + + int32_t CloseApi(int32_t max_waitms); + + private: + std::shared_ptr api_impl_; +}; +} // namespace inlong +#endif // INLONG_SDK_API_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc index e27910c4e94..c3b76928097 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc @@ -1,47 +1,55 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "tcp_client.h" -#include "../utils/utils.h" -#include "api_code.h" + #include +#include "../manager/buffer_manager.h" +#include "../manager/metric_manager.h" +#include "../utils/utils.h" + namespace inlong { #define CLIENT_INFO client_info_ << "[" << status_ << "]" TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port) : socket_(std::make_shared(io_context)), wait_timer_(std::make_shared(io_context)), keep_alive_timer_(std::make_shared(io_context)), - ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), port), - status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false), - proxy_loads_(30), wait_heart_beat_(false), reset_client_(false), - heart_beat_index_(0), only_heart_heat_(false) { + ip_(ip), + port_(port), + endpoint_(asio::ip::address::from_string(ip), port), + status_(kUndefined), + recv_buf_(new BlockMemory()), + exit_(false), + proxy_loads_(30), + wait_heart_beat_(false), + reset_client_(false), + heart_beat_index_(0), + only_heart_heat_(false), + need_retry_(false), + retry_times_(0) { client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]"; tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_; tcp_idle_time_ = SdkConfig::getInstance()->tcp_idle_time_; last_update_time_ = Utils::getCurrentMsTime(); - keep_alive_timer_->expires_after( - std::chrono::milliseconds(tcp_detection_interval_)); - keep_alive_timer_->async_wait( - std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); + keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_)); + keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); LOG_INFO("TcpClient At remote info .status:" << status_ << client_info_); AsyncConnect(); @@ -70,7 +78,7 @@ TcpClient::~TcpClient() { void TcpClient::DoClose() { status_ = kStopped; exit_ = true; - LOG_INFO("closed client." << CLIENT_INFO); + LOG_INFO("Closed client." << CLIENT_INFO); } void TcpClient::AsyncConnect() { @@ -87,13 +95,12 @@ void TcpClient::AsyncConnect() { } } status_ = kConnecting; - LOG_INFO("began to connect." << CLIENT_INFO); + LOG_INFO("Began to connect." << CLIENT_INFO); } catch (std::exception &e) { LOG_ERROR("AsyncConnect exception." << e.what() << CLIENT_INFO); } - socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this, - std::placeholders::_1)); + socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this, std::placeholders::_1)); } void TcpClient::DoAsyncConnect(asio::error_code error) { @@ -116,7 +123,12 @@ void TcpClient::OnConnected(asio::error_code error) { socket_->set_option(asio::ip::tcp::no_delay(true)); asio::socket_base::keep_alive option(true); socket_->set_option(option); - LOG_INFO("client has connected." << CLIENT_INFO); + LOG_INFO("Client has connected." << CLIENT_INFO); + if (need_retry_) { + LOG_WARN("Client has connected retry! times:" << retry_times_ << CLIENT_INFO); + write(sendBuffer_, true); + return; + } status_ = kFree; return; } @@ -124,22 +136,24 @@ void TcpClient::OnConnected(asio::error_code error) { return; } status_ = kConnectFailed; - LOG_ERROR("connect has error:" << error.message() << CLIENT_INFO); + LOG_ERROR("Connect has error:" << error.message() << CLIENT_INFO); + if (need_retry_) { + ResetSendBuffer(); + } wait_timer_->expires_after(std::chrono::milliseconds(kConnectTimeout)); - wait_timer_->async_wait( - std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); + wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); } -void TcpClient::write(SendBufferPtrT sendBuffer) { +void TcpClient::write(SendBufferPtrT sendBuffer, bool retry) { if (kStopped == status_ || exit_) { LOG_ERROR("Stop.At." << CLIENT_INFO); return; } - if (status_ != kFree) { + if (status_ != kFree && !retry) { LOG_WARN("Not free ." << CLIENT_INFO); return; } - sendBuffer_ = sendBuffer; + sendBuffer_ = std::move(sendBuffer); BeginWrite(); } @@ -150,13 +164,10 @@ void TcpClient::BeginWrite() { } last_update_time_ = Utils::getCurrentMsTime(); status_ = kWriting; - asio::async_write(*socket_, - asio::buffer(sendBuffer_->content(), sendBuffer_->len()), - std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_write(*socket_, asio::buffer(sendBuffer_->GetData(), sendBuffer_->GetDataLen()), + std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2)); } -void TcpClient::OnWroten(const asio::error_code error, - std::size_t bytes_transferred) { +void TcpClient::OnWroten(const asio::error_code error, std::size_t bytes_transferred) { if (kStopped == status_ || exit_) { return; } @@ -164,14 +175,14 @@ void TcpClient::OnWroten(const asio::error_code error, if (asio::error::operation_aborted == error) { return; } - LOG_ERROR("write error:" << error.message() << CLIENT_INFO); + LOG_ERROR("Write error:" << error.message() << CLIENT_INFO); status_ = kWriting; HandleFail(); return; } if (0 == bytes_transferred) { - LOG_ERROR("transferred 0 bytes." << CLIENT_INFO); + LOG_ERROR("Transferred 0 bytes." << CLIENT_INFO); status_ = kWaiting; HandleFail(); return; @@ -179,8 +190,7 @@ void TcpClient::OnWroten(const asio::error_code error, status_ = kClientResponse; asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)), - std::bind(&TcpClient::OnReturn, this, std::placeholders::_1, - std::placeholders::_2)); + std::bind(&TcpClient::OnReturn, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::OnReturn(asio::error_code error, std::size_t len) { if (kStopped == status_ || exit_) { @@ -201,18 +211,15 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) { HandleFail(); return; } - size_t resp_len = - ntohl(*reinterpret_cast(recv_buf_->m_data)); + size_t resp_len = ntohl(*reinterpret_cast(recv_buf_->m_data)); if (resp_len > recv_buf_->m_max_size) { status_ = kWaiting; HandleFail(); return; } - asio::async_read(*socket_, - asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len), - std::bind(&TcpClient::OnBody, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_read(*socket_, asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len), + std::bind(&TcpClient::OnBody, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { @@ -230,17 +237,19 @@ void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { return; } uint32_t parse_index = sizeof(uint32_t); - uint8_t msg_type = - *reinterpret_cast(recv_buf_->m_data + parse_index); + uint8_t msg_type = *reinterpret_cast(recv_buf_->m_data + parse_index); switch (msg_type) { - case 8: - ParseHeartBeat(bytesTransferred); - break; - default: - ParseGenericResponse(); - break; + case 8: + ParseHeartBeat(bytesTransferred); + break; + default: + ParseGenericResponse(); + break; } + + ResetRetry(); + if (wait_heart_beat_) { HeartBeat(); wait_heart_beat_ = false; @@ -261,17 +270,12 @@ void TcpClient::HandleFail() { } status_ = kConnecting; - if (sendBuffer_ != nullptr) { - stat_.AddSendFailMsgNum(sendBuffer_->msgCnt()); - stat_.AddSendFailPackNum(1); + ResetSendBuffer(); - stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + int retry_interval = std::min(retry_times_ * constants::kRetryIntervalMs, constants::kMaxRetryIntervalMs); - sendBuffer_->doUserCallBack(); - sendBuffer_->releaseBuf(); - } - - AsyncConnect(); + wait_timer_->expires_after(std::chrono::milliseconds(retry_interval)); + wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); } void TcpClient::DetectStatus(const asio::error_code error) { @@ -282,23 +286,28 @@ void TcpClient::DetectStatus(const asio::error_code error) { return; } if (!only_heart_heat_) { - LOG_INFO(stat_.ToString() << CLIENT_INFO); - stat_.ResetStat(); + UpdateMetric(); } - if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && - status_ != kConnecting) { + if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && status_ != kConnecting) { std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0); - LOG_INFO("reconnect because it has idle " - << tcp_idle_time_ << " ms." - << "last send time:" << last_update_time_ << CLIENT_INFO); + LOG_INFO("Reconnect because it has idle " << tcp_idle_time_ << " ms." + << "last send time:" << last_update_time_ << CLIENT_INFO); AsyncConnect(); } - keep_alive_timer_->expires_after( - std::chrono::milliseconds(tcp_detection_interval_)); - keep_alive_timer_->async_wait( - std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); + keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_)); + keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); +} + +void TcpClient::UpdateMetric() { + Metric stat; + for (auto &it : stat_map_) { + MetricManager::GetInstance()->UpdateMetric(it.first, it.second); + stat.Update(it.second); + it.second.ResetStat(); + } + LOG_INFO(stat.ToString() << CLIENT_INFO); } void TcpClient::HeartBeat(bool only_heart_heat) { @@ -308,12 +317,10 @@ void TcpClient::HeartBeat(bool only_heart_heat) { only_heart_heat_ = only_heart_heat; status_ = kHeartBeat; last_update_time_ = Utils::getCurrentMsTime(); - // status_ = kWriting; bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4); bin_hb_.msg_type = 8; - bin_hb_.data_time = - htonl(static_cast(Utils::getCurrentMsTime() / 1000)); + bin_hb_.data_time = htonl(static_cast(Utils::getCurrentMsTime() / 1000)); bin_hb_.body_ver = 1; bin_hb_.body_len = 0; bin_hb_.attr_len = 0; @@ -322,58 +329,52 @@ void TcpClient::HeartBeat(bool only_heart_heat) { uint32_t hb_len = sizeof(bin_hb_); asio::async_write(*socket_, asio::buffer(hb, hb_len), - std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, - std::placeholders::_2)); + std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::ParseHeartBeat(size_t total_length) { - // | total length(4) | msg type(1) | data time(4) | body version(1) | body - // length (4) | body | attr length(2) | attr | magic (2) | skip total length + // | total length(4) | msg type(1) | data time(4) | body version(1) | body length (4) | body | attr length(2) | attr + // | magic (2) | + // skip total length uint32_t parse_index = sizeof(uint32_t); // skip msg type parse_index += sizeof(uint8_t); // skip data time - // uint32_t data_time = ntohl(*reinterpret_cast(recv_buf_->m_data + parse_index)); + // uint32_t data_time = ntohl(*reinterpret_cast(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint32_t); // 3、parse body version - uint32_t body_version = - *reinterpret_cast(recv_buf_->m_data + parse_index); + uint32_t body_version = *reinterpret_cast(recv_buf_->m_data + parse_index); parse_index += sizeof(uint8_t); // 4、parse body length - uint32_t body_length = ntohl( - *reinterpret_cast(recv_buf_->m_data + parse_index)); + uint32_t body_length = ntohl(*reinterpret_cast(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint32_t); // 5 parse load - int16_t load = ntohs( - *reinterpret_cast(recv_buf_->m_data + parse_index)); + int16_t load = ntohs(*reinterpret_cast(recv_buf_->m_data + parse_index)); parse_index += sizeof(int16_t); // 7 parse attr length - uint16_t attr_length = ntohs( - *reinterpret_cast(recv_buf_->m_data + parse_index)); + uint16_t attr_length = ntohs(*reinterpret_cast(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint16_t); // 8 skip attr parse_index += attr_length; // 9 parse magic - uint16_t magic = ntohs( - *reinterpret_cast(recv_buf_->m_data + parse_index)); + uint16_t magic = ntohs(*reinterpret_cast(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint16_t); if (magic != constants::kBinaryMagic) { - LOG_ERROR("failed to parse heartbeat ack! error magic " - << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO); + LOG_ERROR("Failed to ParseMsg heartbeat ack! error magic " << magic << " !=" << constants::kBinaryMagic + << CLIENT_INFO); return; } if (total_length + 4 != parse_index) { - LOG_ERROR("failed to parse heartbeat ack! total_length " - << total_length << " +4 !=" << parse_index << CLIENT_INFO); + LOG_ERROR("Failed to ParseMsg heartbeat ack! total_length " << total_length << " +4 !=" << parse_index + << CLIENT_INFO); return; } if (heart_beat_index_ > constants::MAX_STAT) { @@ -389,12 +390,12 @@ void TcpClient::ParseHeartBeat(size_t total_length) { void TcpClient::ParseGenericResponse() { if (sendBuffer_ != nullptr) { - stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt()); - stat_.AddSendSuccessPackNum(1); - - stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId(); + stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->GetMsgCnt()); + stat_map_[stat_key].AddSendSuccessPackNum(1); + stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); - sendBuffer_->releaseBuf(); + BufferManager::GetInstance()->AddSendBuffer(sendBuffer_); } } @@ -419,8 +420,7 @@ int32_t TcpClient::GetAvgLoad() { void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; } void TcpClient::UpdateClient(const std::string ip, const uint32_t port) { - LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port - << "] replace" << CLIENT_INFO); + LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port << "] replace" << CLIENT_INFO); ip_ = ip; port_ = port; reset_client_ = true; @@ -438,5 +438,29 @@ void TcpClient::RestClient() { const std::string &TcpClient::getIp() const { return ip_; } const std::string &TcpClient::getClientInfo() const { return client_info_; } uint32_t TcpClient::getPort() const { return port_; } +void TcpClient::ResetRetry() { + need_retry_ = false; + retry_times_ = 0; +} +void TcpClient::ResetSendBuffer() { + if (sendBuffer_ == nullptr) { + return; + } + retry_times_++; + if (retry_times_ > SdkConfig::getInstance()->retry_times_) { + std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId(); + stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->GetMsgCnt()); + stat_map_[stat_key].AddSendFailPackNum(1); + stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + + sendBuffer_->doUserCallBack(); + + BufferManager::GetInstance()->AddSendBuffer(sendBuffer_); -} // namespace inlong + LOG_INFO("resend to proxy fail! retry times:" << retry_times_ << CLIENT_INFO); + ResetRetry(); + } else { + need_retry_ = true; + } +} +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h index 6835a7fd264..f2a92fc459a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h @@ -1,32 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#ifndef INLONG_SDK_TCP_CLIENT_H -#define INLONG_SDK_TCP_CLIENT_H +#ifndef INLONG_TCP_CLIENT_H +#define INLONG_TCP_CLIENT_H +#include +#include + +#include "../metric/metric.h" +#include "../protocol/msg_protocol.h" #include "../utils/block_memory.h" #include "../utils/capi_constant.h" #include "../utils/read_write_mutex.h" #include "../utils/send_buffer.h" -#include "msg_protocol.h" -#include "stat.h" -#include namespace inlong { enum ClientStatus { @@ -46,37 +46,38 @@ enum { }; using IOContext = asio::io_context; using TcpSocketPtr = std::shared_ptr; +using SteadyTimerPtr = std::shared_ptr; class TcpClient { -private: + private: TcpSocketPtr socket_; SteadyTimerPtr wait_timer_; SteadyTimerPtr keep_alive_timer_; ClientStatus status_; std::string ip_; -public: + public: const std::string &getIp() const; -private: + private: uint32_t port_; -public: + public: uint32_t getPort() const; -private: + private: std::string client_info_; -public: + public: const std::string &getClientInfo() const; -private: + private: std::shared_ptr sendBuffer_; asio::ip::tcp::endpoint endpoint_; BlockMemoryPtrT recv_buf_; uint64_t tcp_idle_time_; uint32_t tcp_detection_interval_; uint64_t last_update_time_; - Stat stat_; + Metric stat_; bool exit_; BinaryHB bin_hb_ = {0}; std::vector proxy_loads_; @@ -84,8 +85,11 @@ class TcpClient { bool wait_heart_beat_; bool reset_client_; volatile bool only_heart_heat_; + bool need_retry_; + uint32_t retry_times_; + std::unordered_map stat_map_; -public: + public: TcpClient(IOContext &io_context, std::string ip, uint32_t port); ~TcpClient(); void AsyncConnect(); @@ -98,7 +102,7 @@ class TcpClient { void DoClose(); void HandleFail(); bool isFree() { return (status_ == kFree); } - void write(SendBufferPtrT sendBuffer); + void write(SendBufferPtrT sendBuffer, bool retry = false); void DetectStatus(const asio::error_code error); void HeartBeat(bool only_heart_heat = false); void SetHeartBeatStatus(); @@ -107,10 +111,13 @@ class TcpClient { void UpdateClient(const std::string ip, const uint32_t port); void RestClient(); int32_t GetAvgLoad(); + void ResetRetry(); + void ResetSendBuffer(); + void UpdateMetric(); }; typedef std::shared_ptr TcpClientTPtrT; typedef std::vector TcpClientTPtrVecT; typedef TcpClientTPtrVecT::iterator TcpClientTPtrVecItT; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_TCP_CLIENT_H +#endif // INLONG_TCP_CLIENT_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index edbc075a67f..5f067528af5 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -141,6 +141,7 @@ void SdkConfig::defaultInit() { need_auth_ = constants::kNeedAuth; max_instance_ = constants::kMaxInstance; instance_num_ = 1; + enable_share_msg_ = constants::kEnableShareMsg; } void SdkConfig::InitThreadParam(const rapidjson::Value &doc) { @@ -229,6 +230,13 @@ void SdkConfig::InitCacheParam(const rapidjson::Value &doc) { } else { max_cache_num_ = constants::kMaxCacheNum; } + + if (doc.HasMember("enable_share_msg") && doc["enable_share_msg"].IsBool()) { + const rapidjson::Value &obj = doc["enable_share_msg"]; + enable_share_msg_ = obj.GetBool(); + } else { + enable_share_msg_ = constants::kEnableShareMsg; + } } void SdkConfig::InitZipParam(const rapidjson::Value &doc) { @@ -428,6 +436,19 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { } else { enable_balance_ = constants::kEnableBalance; } + + if (doc.HasMember("retry_times") && doc["retry_times"].IsInt() && doc["retry_times"].GetInt() > 0) { + const rapidjson::Value &obj = doc["retry_times"]; + retry_times_ = obj.GetInt(); + } else { + retry_times_ = constants::kRetryTimes; + } + if (doc.HasMember("proxy_repeat_times") && doc["proxy_repeat_times"].IsInt() && doc["proxy_repeat_times"].GetInt() >= 0) { + const rapidjson::Value &obj = doc["proxy_repeat_times"]; + proxy_repeat_times_ = obj.GetInt(); + } else { + proxy_repeat_times_ = constants::kProxyRepeatTimes; + } } void SdkConfig::InitAuthParm(const rapidjson::Value &doc) { // auth settings diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h index 3f46b1a0f40..f120343f072 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h @@ -53,6 +53,7 @@ class SdkConfig { uint32_t max_cache_num_; uint32_t max_instance_; uint32_t instance_num_; + bool enable_share_msg_; // thread parameters uint32_t per_groupid_thread_nums_; // Sending thread per groupid @@ -94,7 +95,8 @@ class SdkConfig { uint32_t tcp_detection_interval_; // tcp-client detection interval bool enable_balance_; bool enable_local_cache_; - + uint32_t retry_times_; + uint32_t proxy_repeat_times_; // auth settings bool need_auth_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h index 0228f9c96de..f6413717dd5 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h @@ -41,7 +41,8 @@ enum SdkCode { kSendBeforeInit = 22, kFailMallocBuf = 23, kMsgSizeLargerThanPackSize = 24, - kSendBufferFull = 25 + kSendBufferFull = 25, + kBufferManagerFull = 26 }; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index 13ce7223b52..2109af502e8 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -16,19 +16,20 @@ */ #include "api_imp.h" +#include +#include #include "../manager/proxy_manager.h" +#include "../manager/metric_manager.h" #include "../utils/capi_constant.h" #include "../utils/logger.h" #include "../utils/utils.h" - -#include "api_code.h" -#include -#include - -#include "metric_manager.h" +#include "../core/api_code.h" namespace inlong { int32_t ApiImp::InitApi(const char *config_file_path) { + if (config_file_path == nullptr) { + return SdkCode::kErrorInit; + } if (!__sync_bool_compare_and_swap(&inited_, false, true)) { return SdkCode::kMultiInit; } @@ -36,56 +37,65 @@ int32_t ApiImp::InitApi(const char *config_file_path) { user_exit_flag_.getAndSet(0); if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) { - LOG_ERROR("ParseConfig error "); return SdkCode::kErrorInit; } - max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_, - SdkConfig::getInstance()->pack_size_) - constants::ATTR_LENGTH; + + max_msg_length_ = SdkConfig::getInstance()->max_msg_size_ - constants::ATTR_LENGTH; local_ip_ = SdkConfig::getInstance()->local_ip_; return DoInit(); } -int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, UserCallBack call_back) { +int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back) { + int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); + if(code !=SdkCode::kSuccess){ + return code; + } + + return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back); +} +int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back) { + int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); + if(code !=SdkCode::kSuccess){ + return code; + } + + return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back, data_time); +} + +int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len){ if (msg_len > max_msg_length_) { + MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1); return SdkCode::kMsgTooLong; } - if (inlong_group_id == nullptr || inlong_stream_id == nullptr || - msg == nullptr || msg_len <= 0) { + if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == nullptr || msg_len <= 0) { return SdkCode::kInvalidInput; } if (inited_ == false) { return SdkCode::kSendBeforeInit; } - - int64_t msg_time = Utils::getCurrentMsTime(); - return this->SendBase(inlong_group_id, inlong_stream_id, local_ip_, msg_time, - {msg, msg_len}, call_back); + return SdkCode::kSuccess; } -int32_t ApiImp::SendBase(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string client_ip, int64_t report_time, - const std::string msg, UserCallBack call_back) { +int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg, + UserCallBack call_back, int64_t report_time) { int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg); if (check_ret != SdkCode::kSuccess) { return check_ret; } - ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true); + ProxyManager::GetInstance()->CheckGroupIdConf(inlong_group_id, true); - auto recv_group = - recv_manager_->GetRecvGroup(inlong_group_id); + auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id); if (recv_group == nullptr) { - LOG_ERROR("fail to get recv group, inlong_group_id:" - << inlong_group_id << " inlong_stream_id:" << inlong_stream_id); + LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << inlong_stream_id); return SdkCode::kFailGetRevGroup; } - return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, client_ip, - report_time, call_back); + return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, report_time, call_back); } int32_t ApiImp::CloseApi(int32_t max_waitms) { @@ -98,44 +108,38 @@ int32_t ApiImp::CloseApi(int32_t max_waitms) { } int32_t ApiImp::DoInit() { - LOG_INFO( - "inlong dataproxy sdk cpp start Init, version:" << constants::kVersion); + LOG_INFO("tdbus sdk cpp start Init, version:" << constants::kVersion); signal(SIGPIPE, SIG_IGN); - LOG_INFO("inlong dataproxy cpp sdk Init complete!"); + LOG_INFO("tdbus_sdk_cpp Init complete!"); ProxyManager::GetInstance()->Init(); MetricManager::GetInstance()->Init(); for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) { - LOG_INFO("DoInit CheckConf inlong_group_id:" - << SdkConfig::getInstance()->inlong_group_ids_[i]); - ProxyManager::GetInstance()->CheckBidConf( - SdkConfig::getInstance()->inlong_group_ids_[i], false); + LOG_INFO("Do init:" << SdkConfig::getInstance()->inlong_group_ids_[i]); + ProxyManager::GetInstance()->CheckGroupIdConf(SdkConfig::getInstance()->inlong_group_ids_[i], false); } return InitManager(); } -int32_t ApiImp::CheckData(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string msg) { +int32_t ApiImp::CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg) { if (init_succeed_ == 0 || user_exit_flag_.get() == 1) { LOG_ERROR("capi has been closed, Init first and then send"); return SdkCode::kSendAfterClose; } if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) { - LOG_ERROR("invalid input, inlong_group_id" - << inlong_group_id << " inlong_stream_id" << inlong_stream_id); + LOG_ERROR("invalid input, group id:" << inlong_group_id << " stream id:" << inlong_stream_id << "msg" << msg); return SdkCode::kInvalidInput; } if (msg.size() > SdkConfig::getInstance()->max_msg_size_) { - LOG_ERROR("msg len is too long, cur msg_len" - << msg.size() << " ext_pack_size" - << SdkConfig::getInstance()->max_msg_size_); + MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1); + LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " ext_pack_size" + << SdkConfig::getInstance()->max_msg_size_); return SdkCode::kMsgTooLong; } @@ -156,15 +160,15 @@ int32_t ApiImp::InitManager() { init_succeed_ = true; return SdkCode::kSuccess; } -int32_t -ApiImp::AddInLongGroupId(const std::vector &inlong_group_ids) { +int32_t ApiImp::AddInLongGroupId(const std::vector &group_ids) { if (inited_ == false) { return SdkCode::kSendBeforeInit; } - for (auto inlong_group_id : inlong_group_ids) { - ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, false); + for (auto group_id : group_ids) { + ProxyManager::GetInstance()->CheckGroupIdConf(group_id, false); } + return SdkCode::kSuccess; } ApiImp::ApiImp() = default; ApiImp::~ApiImp() = default; -} // namespace inlong \ No newline at end of file +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h index c6e6ecd0e43..cd6a9757a41 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h @@ -15,45 +15,43 @@ * limitations under the License. */ -#ifndef INLONG_SDK_API_IMP_H -#define INLONG_SDK_API_IMP_H +#ifndef INLONG_API_IMP_H +#define INLONG_API_IMP_H +#include +#include #include "../config/sdk_conf.h" #include "../manager/recv_manager.h" #include "../manager/send_manager.h" #include "../utils/atomic.h" -#include -#include namespace inlong { - -typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, - const int64_t, const char *); +typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, const int64_t, const char *); class ApiImp { -public: + public: ApiImp(); ~ApiImp(); int32_t InitApi(const char *config_file_path); - int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back = nullptr); + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t report_time, UserCallBack call_back = nullptr); int32_t CloseApi(int32_t max_waitms); - int32_t AddInLongGroupId(const std::vector &inlong_group_ids); + int32_t AddInLongGroupId(const std::vector &group_ids); -private: + private: int32_t DoInit(); int32_t InitManager(); - int32_t SendBase(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string client_ip, int64_t report_time, - const std::string msg, UserCallBack call_back); + int32_t SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg, UserCallBack call_back, + int64_t report_time = 0); + + int32_t CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg); - int32_t CheckData(const std::string inlong_group_id, - const std::string inlong_stream_id, const std::string msg); + int32_t ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len); AtomicInt user_exit_flag_{0}; volatile bool init_flag_ = false; @@ -67,5 +65,5 @@ class ApiImp { std::shared_ptr send_manager_; }; -} // namespace inlong -#endif // INLONG_SDK_API_IMP_H +} // namespace inlong +#endif // INLONG_API_IMP_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc index 14a5f91207e..aa0674414db 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc @@ -15,28 +15,25 @@ * limitations under the License. */ -#include "inlong_api.h" +#include "../../include/inlong_api.h" #include "../core/api_imp.h" namespace inlong { -InLongApi::InLongApi() { api_impl_ = std::make_shared(); }; +InLongApi::InLongApi() { api_impl_ = std::make_shared(); } InLongApi::~InLongApi() { api_impl_->CloseApi(10); } -int32_t InLongApi::InitApi(const char *config_path) { - return api_impl_->InitApi(config_path); -} +int32_t InLongApi::InitApi(const char *config_path) { return api_impl_->InitApi(config_path); } -int32_t InLongApi::Send(const char *inlong_group_id, - const char *inlong_stream_id, const char *msg, - int32_t msg_len, UserCallBack call_back) { - return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, - call_back); +int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back) { + return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, call_back); } -int32_t InLongApi::CloseApi(int32_t max_waitms) { - return api_impl_->CloseApi(max_waitms); -} -int32_t InLongApi::AddBid(const std::vector &groupids) { - return api_impl_->AddInLongGroupId(groupids); +int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back) { + return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, data_time, call_back); } -} // namespace inlong \ No newline at end of file + +int32_t InLongApi::CloseApi(int32_t max_waitms) { return api_impl_->CloseApi(max_waitms); } +int32_t InLongApi::AddInLongGroupId(const std::vector &bids) { return api_impl_->AddInLongGroupId(bids); } +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h index 66a1a7ce395..88f0a528ff9 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h @@ -1,20 +1,18 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #ifndef INLONG_SDK_API_H @@ -34,21 +32,23 @@ typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, class ApiImp; class InLongApi { -public: + public: InLongApi(); ~InLongApi(); int32_t InitApi(const char *config_path); - int32_t AddBid(const std::vector &groupids); + int32_t AddGroupId(const std::vector &group_ids); - int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back = nullptr); + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back = nullptr); + int32_t CloseApi(int32_t max_waitms); -private: + private: std::shared_ptr api_impl_; }; -} // namespace inlong -#endif // INLONG_SDK_API_H +} // namespace inlong +#endif // INLONG_SDK_API_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h index df7e90f4ace..69bcdddde84 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h @@ -15,8 +15,8 @@ * limitations under the License. */ -#ifndef SDK_USER_MSG_H_ -#define SDK_USER_MSG_H_ +#ifndef SDK_USER_MSG_H +#define SDK_USER_MSG_H #include #include @@ -40,17 +40,44 @@ struct SdkMsg { std::string inlong_group_id_; std::string inlong_stream_id_; - SdkMsg(const std::string &mmsg, const std::string &mclient_ip, - int64_t mreport_time, UserCallBack mcb, const std::string &attr, - const std::string &u_ip, int64_t u_time,const std::string& inlong_group_id,const std::string& inlong_stream_id) + SdkMsg(const std::string &mmsg, + const std::string &mclient_ip, + int64_t mreport_time, + UserCallBack mcb, + const std::string &attr, + const std::string &u_ip, + int64_t u_time, + const std::string &inlong_group_id, + const std::string &inlong_stream_id) : msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time), cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip), data_pack_format_attr_(attr), inlong_group_id_(inlong_group_id), - inlong_stream_id_(inlong_stream_id){} + inlong_stream_id_(inlong_stream_id) {} + SdkMsg() {}; + void setMsg(const std::string &msg) { msg_ = msg; } + void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; } + void setReportTime(uint64_t reportTime) { report_time_ = reportTime; } + void setCb(UserCallBack cb) { cb_ = cb; } + void setUserReportTime(uint64_t userReportTime) { user_report_time_ = userReportTime; } + void setUserClientIp(const std::string &userClientIp) { user_client_ip_ = userClientIp; } + void setDataPackFormatAttr(const std::string &dataPackFormatAttr) { data_pack_format_attr_ = dataPackFormatAttr; } + void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; } + void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = inlong_stream_id; } + + void clear() { + msg_ = ""; + client_ip_ = ""; + report_time_ = 0; + cb_ = nullptr; + user_report_time_ = 0; + user_client_ip_ = ""; + data_pack_format_attr_ = ""; + inlong_group_id_ = ""; + inlong_stream_id_ = ""; + } }; using SdkMsgPtr = std::shared_ptr; } // namespace inlong - -#endif // SDK_USER_MSG_H_ \ No newline at end of file +#endif // SDK_USER_MSG_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index b852095f687..1ffeeef8dc8 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -1,49 +1,56 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ #include "recv_group.h" -#include "../protocol/msg_protocol.h" -#include "../utils/capi_constant.h" -#include "../utils/utils.h" -#include "api_code.h" -#include #include +#include "../core/api_code.h" +#include "../manager/buffer_manager.h" +#include "../utils/utils.h" +#include "../manager/msg_manager.h" +#include "../manager/metric_manager.h" namespace inlong { const uint32_t DEFAULT_PACK_ATTR = 400; -const uint64_t LOG_SAMPLE=100; -RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr send_manager) - : cur_len_(0), groupId_num_(0), streamId_num_(0), +const uint64_t LOG_SAMPLE = 100; +RecvGroup::RecvGroup(const std::string &group_key, std::shared_ptr send_manager) + : cur_len_(0), + group_key_(group_key), + groupId_num_(0), + streamId_num_(0), msg_type_(SdkConfig::getInstance()->msg_type_), - data_capacity_(SdkConfig::getInstance()->buf_size_), - send_manager_(send_manager),group_key_(group_key), - log_stat_(0){ - data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, - SdkConfig::getInstance()->pack_size_); + data_capacity_(SdkConfig::getInstance()->recv_buf_size_), + send_manager_(send_manager), + log_stat_(0), + send_group_(nullptr), + max_msg_size_(0), + uniq_id_(0) { + data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR; pack_buf_ = new char[data_capacity_]; memset(pack_buf_, 0x0, data_capacity_); - data_time_ = 0; + data_time_ = Utils::getCurrentMsTime(); last_pack_time_ = Utils::getCurrentMsTime(); max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_; - - LOG_INFO("RecvGroup:"<local_ip_; + LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ << ",max_recv_size:" << max_recv_size_); } RecvGroup::~RecvGroup() { @@ -53,136 +60,108 @@ RecvGroup::~RecvGroup() { } } -int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId, - const std::string &streamId, - const std::string &client_ip, uint64_t report_time, - UserCallBack call_back) { +int32_t RecvGroup::SendData(const std::string &msg, const std::string &inlong_group_id_, const std::string &inlong_stream_id_, + uint64_t report_time, UserCallBack call_back) { std::lock_guard lck(mutex_); - if (msg.size() + cur_len_ > max_recv_size_) { + MetricManager::GetInstance()->AddReceiveBufferFullCount(inlong_group_id_,inlong_stream_id_,1); return SdkCode::kRecvBufferFull; } - AddMsg(msg, client_ip, report_time, call_back,groupId,streamId); - - return SdkCode::kSuccess; -} + uint64_t data_time = (report_time == 0) ? data_time_ : report_time; -int32_t RecvGroup::DoDispatchMsg() { - last_pack_time_ = Utils::getCurrentMsTime(); - std::lock_guard lck(mutex_); - if (group_key_.empty()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("groupId is empty, check!!"); - log_stat_ = 0; - } - return SdkCode::kInvalidInput; - } - if (msgs_.empty()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("no msg in msg_set, check!"); - log_stat_ = 0; - } - return SdkCode::kMsgEmpty; - } - auto send_group = send_manager_->GetSendGroup(group_key_); - if (send_group == nullptr) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send_buf, something gets wrong, checkout!"); - log_stat_ = 0; - } - return SdkCode::kFailGetSendBuf; - } - if (!send_group->IsAvailable()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send group! group_key:" - << group_key_ << " send group is not available!"); - log_stat_ = 0; - } - return SdkCode::kFailGetConn; + std::string data_pack_format_attr = + "__addcol1__reptime=" + Utils::getFormatTime(data_time) + "&__addcol2__ip=" + local_ip_; + max_msg_size_ = std::max(max_msg_size_, msg.size()); + auto it = recv_queue_.find(inlong_group_id_ + inlong_stream_id_); + if (it == recv_queue_.end()) { + std::queue tmp; + it = recv_queue_.insert(recv_queue_.begin(), std::make_pair(inlong_group_id_ + inlong_stream_id_, tmp)); } - if (send_group->IsFull()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send group! group_key:" - << group_key_ << " send group is full!"); - log_stat_ = 0; - } - return SdkCode::kSendBufferFull; + SdkMsgPtr msg_ptr = MsgManager::GetInstance()->GetMsg(); + if(nullptr == msg_ptr){ + it->second.emplace(std::make_shared(msg, local_ip_, data_time, call_back, data_pack_format_attr, local_ip_, data_time, inlong_group_id_, inlong_stream_id_)); + }else{ + msg_ptr->setMsg(msg); + msg_ptr->setClientIp(local_ip_); + msg_ptr->setReportTime(data_time); + msg_ptr->setCb(call_back); + msg_ptr->setDataPackFormatAttr(data_pack_format_attr); + msg_ptr->setUserClientIp(local_ip_); + msg_ptr->setUserReportTime(data_time); + msg_ptr->setGroupId(inlong_group_id_); + msg_ptr->setStreamId(inlong_stream_id_); + it->second.emplace(msg_ptr); } - uint32_t total_length = 0; - uint64_t max_tid_size = 0; - std::unordered_map> msgs_to_dispatch; - std::unordered_map tid_stat; - while (!msgs_.empty()) { - SdkMsgPtr msg = msgs_.front(); - if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > SdkConfig::getInstance()->pack_size_) { - if (!msgs_to_dispatch.empty()) { - break; - } - } - std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_; - msgs_to_dispatch[msg_key].push_back(msg); - msgs_.pop(); - - total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH; + cur_len_ = cur_len_ + msg.size() + constants::ATTR_LENGTH; - if (tid_stat.find(msg_key) == tid_stat.end()) { - tid_stat[msg_key] = 0; - } - tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + constants::ATTR_LENGTH; + return SdkCode::kSuccess; +} - max_tid_size = std::max(tid_stat[msg_key], max_tid_size); +void RecvGroup::DoDispatchMsg() { + if (!CanDispatch()) { + return; } - - cur_len_ = cur_len_ - total_length; - - for (auto it : msgs_to_dispatch) { - std::shared_ptr send_buffer = BuildSendBuf(it.second); - - ResetPackBuf(); - - if (send_buffer == nullptr) { - CallbalkToUsr(it.second); - continue; - } - - int ret = send_group->PushData(send_buffer); - if (ret != SdkCode::kSuccess) { - CallbalkToUsr(it.second); + last_pack_time_ = Utils::getCurrentMsTime(); + data_time_ = last_pack_time_; + while (!fail_queue_.empty()) { + SendBufferPtrT tmp_ptr = fail_queue_.front(); + if (SdkCode::kSuccess != send_group_->PushData(tmp_ptr)) { + return; } + fail_queue_.pop(); } - return SdkCode::kSuccess; -} - -void RecvGroup::AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back,const std::string &groupId, - const std::string &streamId) { - if (Utils::isLegalTime(report_time)) - data_time_ = report_time; - else { - data_time_ = Utils::getCurrentMsTime(); + { + std::lock_guard lck(mutex_); + recv_queue_.swap(dispatch_queue_); } - std::string user_client_ip = client_ip; - int64_t user_report_time = report_time; + for (auto &it : dispatch_queue_) { + std::vector msg_vec; + uint64_t msg_size = 0; + while (!it.second.empty()) { + SdkMsgPtr msg = it.second.front(); + msg_vec.push_back(msg); + msg_size = msg_size + msg->msg_.size() + constants::ATTR_LENGTH; + it.second.pop(); + + if ((msg_size + max_msg_size_) >= SdkConfig::getInstance()->pack_size_) { + uint32_t ret = ParseMsg(msg_vec); + if (SdkCode::kBufferManagerFull == ret) { + for (const auto &it_msg : msg_vec) { + it.second.emplace(it_msg); + } + return; + } + UpdateCurrentMsgLen(msg_size); + msg_size = 0; + + if (SdkCode::kSuccess != ret) { + return; + } + std::vector().swap(msg_vec); + } + } + if (!msg_vec.empty()) { + uint32_t ret = ParseMsg(msg_vec); + if (SdkCode::kBufferManagerFull == ret) { + for (const auto &it_msg : msg_vec) { + it.second.emplace(it_msg); + } + return; + } + UpdateCurrentMsgLen(msg_size); - if (client_ip.empty()) { - client_ip = "127.0.0.1"; + if (SdkCode::kSuccess != ret) { + return; + } + } } - std::string data_pack_format_attr = - "__addcol1__reptime=" + Utils::getFormatTime(data_time_) + - "&__addcol2__ip=" + client_ip; - msgs_.push(std::make_shared(msg, client_ip, data_time_, call_back, - data_pack_format_attr, user_client_ip, - user_report_time,groupId,streamId)); - - cur_len_ += msg.size() + constants::ATTR_LENGTH; } -bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, - uint32_t &out_len, uint32_t uniq_id) { +bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data,uint32_t &out_len) { if (pack_data == nullptr) { LOG_ERROR("nullptr, failed to allocate memory for buf"); return false; @@ -196,7 +175,6 @@ bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size()); idx += static_cast(it->msg_.size()); - // add attrlen|attr if (SdkConfig::getInstance()->isAttrDataPackFormat()) { *(uint32_t *)(&pack_buf_[idx]) = htonl(it->data_pack_format_attr_.size()); idx += sizeof(uint32_t); @@ -247,7 +225,7 @@ bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, groupId_num = 0; streamId_num = 0; groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; char_groupId_flag = 0x4; } else { groupId_num = groupId_num_; @@ -261,14 +239,14 @@ bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, if (SdkConfig::getInstance()->enableTraceIP()) { if (groupId_streamId_char.empty()) attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ + - "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); else attr = groupId_streamId_char + - "&node1ip=" + SdkConfig::getInstance()->local_ip_ + - "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); + "&node1ip=" + SdkConfig::getInstance()->local_ip_ + + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); } else { attr = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; } *(uint16_t *)bodyBegin = htons(attr.size()); bodyBegin += sizeof(uint16_t); @@ -294,7 +272,7 @@ bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, p += 4; *(uint16_t *)p = htons(cnt); p += 2; - *(uint32_t *)p = htonl(uniq_id); + *(uint32_t *)p = htonl(uniq_id_); out_len = total_len + 4; } else { @@ -319,10 +297,10 @@ bool RecvGroup::PackMsg(std::vector &msgs, char *pack_data, // attr std::string attr; attr = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; attr += "&dt=" + std::to_string(data_time_); - attr += "&mid=" + std::to_string(uniq_id); + attr += "&mid=" + std::to_string(uniq_id_); if (isSnappy) attr += "&cp=snappy"; attr += "&cnt=" + std::to_string(cnt); @@ -352,56 +330,97 @@ bool RecvGroup::IsZipAndOperate(std::string &res, uint32_t real_cur_len) { } void RecvGroup::DispatchMsg(bool exit) { - if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty()) - return; + if (cur_len_ <= constants::ATTR_LENGTH) return; bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_; - bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) > - SdkConfig::getInstance()->pack_timeout_; + bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) > SdkConfig::getInstance()->pack_timeout_; if (len_enough || time_enough) { DoDispatchMsg(); } } -std::shared_ptr -RecvGroup::BuildSendBuf(std::vector &msgs) { +std::shared_ptr RecvGroup::BuildSendBuf(std::vector &msgs) { if (msgs.empty()) { - LOG_ERROR("pack msgs is empty."); return nullptr; } - std::shared_ptr send_buffer = - std::make_shared(data_capacity_); + std::shared_ptr send_buffer = BufferManager::GetInstance()->GetSendBuffer(); if (send_buffer == nullptr) { - LOG_ERROR("make send buffer failed."); return nullptr; } + uint32_t len = 0; - int32_t msg_cnt = msgs.size(); - uint32_t uniq_id = g_send_msgid.incrementAndGet(); + uint32_t msg_cnt = msgs.size(); + if (++uniq_id_ >= constants::kMaxSnowFlake) { + uniq_id_ = 0; + } - if (!PackMsg(msgs, send_buffer->content(), len, uniq_id) || len == 0) { - LOG_ERROR("failed to write data to send buf from pack queue, sendQueue " - "id:%d, buf id:%d"); + if (!PackMsg(msgs, send_buffer->GetData(), len) || len == 0) { + LOG_ERROR("failed to write data to send buf from pack queue, sendQueue"); return nullptr; } - send_buffer->setLen(len); - send_buffer->setMsgCnt(msg_cnt); - send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_); - send_buffer->setStreamId(msgs[0]->inlong_stream_id_); - send_buffer->setUniqId(uniq_id); - send_buffer->setIsPacked(true); - for (auto it : msgs) { - send_buffer->addUserMsg(it); + + send_buffer->SetDataLen(len); + send_buffer->SetMsgCnt(msg_cnt); + send_buffer->SetInlongGroupId(msgs[0]->inlong_group_id_); + send_buffer->SetInlongStreamId(msgs[0]->inlong_stream_id_); + for (const auto &it : msgs) { + if(it->cb_){ + send_buffer->addUserMsg(it); + } } return send_buffer; } -void RecvGroup::CallbalkToUsr(std::vector &msgs) { - for (auto &it : msgs) { - if (it->cb_) { - it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), - it->msg_.data(), it->msg_.size(), it->user_report_time_, - it->user_client_ip_.data()); +uint32_t RecvGroup::ParseMsg(std::vector &msg_vec) { + if (msg_vec.empty()) { + return SdkCode::kSuccess; + } + + std::shared_ptr send_buffer = BuildSendBuf(msg_vec); + + if (send_buffer == nullptr) { + return SdkCode::kBufferManagerFull; + } + + uint32_t ret = send_group_->PushData(send_buffer); + if (ret != SdkCode::kSuccess) { + fail_queue_.push(send_buffer); + } + return ret; +} +bool RecvGroup::CanDispatch() { + if (group_key_.empty()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("Group key is empty!"); + log_stat_ = 0; + } + return false; + } + if (nullptr == send_group_) { + send_group_ = send_manager_->GetSendGroup(group_key_); + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_); + log_stat_ = 0; + } + return false; + } + if (!send_group_->IsAvailable()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is not available!"); + log_stat_ = 0; } + return false; } + if (send_group_->IsFull()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is full!"); + log_stat_ = 0; + } + return false; + } + return true; +} +void RecvGroup::UpdateCurrentMsgLen(uint64_t msg_size) { + std::lock_guard lck(mutex_); + cur_len_ = cur_len_ - msg_size; } -} // namespace inlong \ No newline at end of file +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h index adac9a7f108..839c49e14d2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h @@ -1,18 +1,20 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ #ifndef INLONG_SDK_RECV_GROUP_H @@ -25,17 +27,19 @@ #include #include "../config/sdk_conf.h" +#include "../core/sdk_msg.h" #include "../manager/send_manager.h" #include "../utils/atomic.h" #include "../utils/noncopyable.h" namespace inlong { class RecvGroup { -private: - char *pack_buf_; + private: + char* pack_buf_; std::queue msgs_; + std::queue fail_msgs_; uint32_t data_capacity_; - uint32_t cur_len_; + uint64_t cur_len_; AtomicInt pack_err_; uint64_t data_time_; uint16_t groupId_num_; @@ -47,33 +51,36 @@ class RecvGroup { uint64_t last_pack_time_; uint64_t max_recv_size_; + std::string group_key_; uint64_t log_stat_; + SendGroupPtr send_group_; + std::string local_ip_; + uint64_t max_msg_size_; + uint64_t uniq_id_; + + std::unordered_map> recv_queue_; + std::unordered_map> dispatch_queue_; + std::queue fail_queue_; - int32_t DoDispatchMsg(); - void AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back,const std::string &groupId, - const std::string &streamId); - bool IsZipAndOperate(std::string &res, uint32_t real_cur_len); + void DoDispatchMsg(); + bool IsZipAndOperate(std::string& res, uint32_t real_cur_len); inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); } + uint32_t ParseMsg(std::vector& msg_vec); -public: - RecvGroup(const std::string &group_key,std::shared_ptr send_manager); + public: + RecvGroup(const std::string& group_key, std::shared_ptr send_manager); ~RecvGroup(); - int32_t SendData(const std::string &msg, const std::string &groupId, - const std::string &streamId, const std::string &client_ip, - uint64_t report_time, UserCallBack call_back); - bool PackMsg(std::vector &msgs, char *pack_data, uint32_t &out_len, - uint32_t uniq_id); + int32_t SendData(const std::string& msg, const std::string& inlong_group_id_, const std::string& inlong_stream_id_, uint64_t report_time, + UserCallBack call_back); + bool PackMsg(std::vector& msgs, char* pack_data, uint32_t& out_len); void DispatchMsg(bool exit); - - char *data() const { return pack_buf_; } - - std::shared_ptr BuildSendBuf(std::vector &msgs); - void CallbalkToUsr(std::vector &msgs); + std::shared_ptr BuildSendBuf(std::vector& msgs); + bool CanDispatch(); + void UpdateCurrentMsgLen(uint64_t msg_size); }; using RecvGroupPtr = std::shared_ptr; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_RECV_GROUP_H \ No newline at end of file +#endif // INLONG_SDK_RECV_GROUP_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc index f3171637998..0bbedcd0fb3 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc @@ -1,45 +1,45 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "send_group.h" -#include "api_code.h" -#include "proxy_manager.h" +#include #include #include +#include "../core/api_code.h" +#include "../manager/proxy_manager.h" namespace inlong { -const int kDefaultQueueSize = 20; +const uint32_t kDefaultQueueSize = 200; SendGroup::SendGroup(std::string send_group_key) : work_(asio::make_work_guard(io_context_)), - send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0), - load_threshold_(0), max_proxy_num_(0) { - max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / - SdkConfig::getInstance()->pack_size_; - if (max_send_queue_num_ <= 0) { + send_group_key_(send_group_key), + send_idx_(0), + dispatch_stat_(0), + load_threshold_(0), + max_proxy_num_(0) { + max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / SdkConfig::getInstance()->pack_size_; + if (max_send_queue_num_ <= kDefaultQueueSize) { max_send_queue_num_ = kDefaultQueueSize; } - LOG_INFO("SendGroup:" << send_group_key_ - << ",max send queue num:" << max_send_queue_num_); + max_send_queue_num_ = std::max(max_send_queue_num_, kDefaultQueueSize); + LOG_INFO("SendGroup:" << send_group_key_ << ",max send queue num:" << max_send_queue_num_); dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_; load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_; - heart_beat_interval_ = - SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_; + heart_beat_interval_ = SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_; need_balance_ = SdkConfig::getInstance()->enable_balance_; work_clients_old_ = nullptr; @@ -48,23 +48,19 @@ SendGroup::SendGroup(std::string send_group_key) send_timer_ = std::make_shared(io_context_); send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1)); update_conf_timer_ = std::make_shared(io_context_); update_conf_timer_->expires_after(std::chrono::milliseconds(1)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); if (SdkConfig::getInstance()->enable_balance_) { load_balance_timer_ = std::make_shared(io_context_); - load_balance_timer_->expires_after( - std::chrono::milliseconds(load_balance_interval_)); - load_balance_timer_->async_wait( - std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); + load_balance_timer_->expires_after(std::chrono::milliseconds(load_balance_interval_)); + load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); } - current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); + current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); thread_ = std::thread(&SendGroup::Run, this); } SendGroup::~SendGroup() { @@ -84,14 +80,16 @@ SendGroup::~SendGroup() { thread_.join(); } } -void SendGroup::Run() { io_context_.run(); } +void SendGroup::Run() { + prctl(PR_SET_NAME, "send-group"); + io_context_.run(); +} void SendGroup::PreDispatchData(std::error_code error) { if (error) { return; } send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); } void SendGroup::DispatchData(std::error_code error) { @@ -125,8 +123,7 @@ void SendGroup::DispatchData(std::error_code error) { } send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); } void SendGroup::HeartBeat() { unique_read_lock rdlck(work_clients_mutex_); @@ -148,12 +145,12 @@ void SendGroup::HeartBeat() { bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; } -uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) { +uint32_t SendGroup::PushData(const SendBufferPtrT &send_buffer_ptr) { if (IsFull()) { return SdkCode::kSendBufferFull; } std::lock_guard lock(mutex_); - send_buf_list_.push(send_buffer_ptr); + send_proxy_list_.push(send_buffer_ptr); return SdkCode::kSuccess; } @@ -165,60 +162,53 @@ void SendGroup::UpdateConf(std::error_code error) { ClearOldTcpClients(); - ProxyInfoVec new_bus_info; - if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) != - kSuccess || - new_bus_info.empty()) { + ProxyInfoVec new_proxy_info; + if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_proxy_info) != kSuccess || new_proxy_info.empty()) { update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - if (new_bus_info.empty()) { - LOG_INFO("UpdateConf new_bus_info is empty!"); + if (new_proxy_info.empty()) { + LOG_INFO("New proxy is empty when update config!"); return; } - load_threshold_ = new_bus_info[0].GetLoad() > constants::kDefaultLoadThreshold - ? constants::kDefaultLoadThreshold - : std::max((new_bus_info[0].GetLoad()), 0); + load_threshold_ = new_proxy_info[0].GetLoad() > constants::kDefaultLoadThreshold + ? constants::kDefaultLoadThreshold + : std::max((new_proxy_info[0].GetLoad()), 0); - if (!IsConfChanged(current_bus_vec_, new_bus_info)) { - LOG_INFO("Don`t need UpdateConf. current bus size(" - << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size() - << ")"); + if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) { + LOG_INFO("Don`t need UpdateConf. current proxy size(" << current_proxy_vec_.size() << ")=proxy size(" + << new_proxy_info.size() << ")"); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - max_proxy_num_ = - std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size()); + max_proxy_num_ = std::min(SdkConfig::getInstance()->max_proxy_num_, new_proxy_info.size()); - std::shared_ptr> tcp_clients_tmp = - std::make_shared>(); + std::shared_ptr> tcp_clients_tmp = std::make_shared>(); if (tcp_clients_tmp == nullptr) { - LOG_INFO("tcp_clients_tmp is nullptr"); + LOG_INFO("Tcp clients tmp is nullptr"); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - std::random_shuffle(new_bus_info.begin(), new_bus_info.end()); + std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end()); tcp_clients_tmp->reserve(max_proxy_num_); for (int i = 0; i < max_proxy_num_; i++) { - ProxyInfo bus_tmp = new_bus_info[i]; - TcpClientTPtrT tcpClientTPtrT = - std::make_shared(io_context_, bus_tmp.ip(), bus_tmp.port()); - tcp_clients_tmp->push_back(tcpClientTPtrT); - LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]"); + ProxyInfo tmp_proxy = new_proxy_info[i]; + for (int repeat_time = 0; repeat_time < SdkConfig::getInstance()->proxy_repeat_times_; repeat_time++) { + TcpClientTPtrT tcpClientTPtrT = std::make_shared(io_context_, tmp_proxy.ip(), tmp_proxy.port()); + tcp_clients_tmp->push_back(tcpClientTPtrT); + LOG_INFO("New proxy info.[" << tmp_proxy.ip() << ":" << tmp_proxy.port() << "]"); + } } { - LOG_INFO("do change tcp clients."); + LOG_INFO("Do change tcp clients."); unique_write_lock wtlck(work_clients_mutex_); work_clients_old_ = work_clients_; work_clients_ = tcp_clients_tmp; @@ -230,51 +220,43 @@ void SendGroup::UpdateConf(std::error_code error) { } } - current_bus_vec_ = new_bus_info; + current_proxy_vec_ = new_proxy_info; InitReserveClient(); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); - LOG_INFO("Finished UpdateConf."); + LOG_INFO("Finished update send group config."); } SendBufferPtrT SendGroup::PopData() { std::lock_guard lock(mutex_); - if (send_buf_list_.empty()) { + if (send_proxy_list_.empty()) { return nullptr; } - SendBufferPtrT send_buf = send_buf_list_.front(); - send_buf_list_.pop(); + SendBufferPtrT send_buf = send_proxy_list_.front(); + send_proxy_list_.pop(); return send_buf; } uint32_t SendGroup::GetQueueSize() { std::lock_guard lock(mutex_); - return send_buf_list_.size(); + return send_proxy_list_.size(); } -bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_bus_vec, - ProxyInfoVec &new_bus_vec) { - if (new_bus_vec.empty()) - return false; - if (current_bus_vec.size() != new_bus_vec.size()) { +bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec &new_proxy_vec) { + if (new_proxy_vec.empty()) return false; + if (current_proxy_vec.size() != new_proxy_vec.size()) { return true; } - for (auto ¤t_bu : current_bus_vec) { - for (int i = 0; i < new_bus_vec.size(); i++) { - if ((current_bu.ip() == new_bus_vec[i].ip()) && - (current_bu.port() == new_bus_vec[i].port())) - break; - if (i == (new_bus_vec.size() - 1)) { - if ((current_bu.ip() != new_bus_vec[i].ip() || - current_bu.port() == new_bus_vec[i].port())) { - LOG_INFO("current bus ip." << current_bu.ip() << ":" - << current_bu.port() - << " can`t find in bus."); + for (auto ¤t_bu : current_proxy_vec) { + for (int i = 0; i < new_proxy_vec.size(); i++) { + if ((current_bu.ip() == new_proxy_vec[i].ip()) && (current_bu.port() == new_proxy_vec[i].port())) break; + if (i == (new_proxy_vec.size() - 1)) { + if ((current_bu.ip() != new_proxy_vec[i].ip() || current_bu.port() == new_proxy_vec[i].port())) { + LOG_INFO("current proxy ip." << current_bu.ip() << ":" << current_bu.port() << " can`t find in proxy."); return true; } } @@ -312,8 +294,7 @@ void SendGroup::LoadBalance(std::error_code error) { uint64_t interval = load_balance_interval_ + rand() % 120 * 1000; LOG_INFO("LoadBalance interval:" << interval); load_balance_timer_->expires_after(std::chrono::milliseconds(interval)); - load_balance_timer_->async_wait( - std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); + load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); } void SendGroup::DoLoadBalance() { @@ -328,17 +309,13 @@ void SendGroup::DoLoadBalance() { return; } - if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > - load_threshold_) { - LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" - << work_client->getClientInfo() << ",load[work " - << work_client->GetAvgLoad() << "][reserve " - << reserve_client->GetAvgLoad() << "][threshold " - << load_threshold_ << "]"); + if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > load_threshold_) { + LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" << work_client->getClientInfo() + << ",load[work " << work_client->GetAvgLoad() << "][reserve " + << reserve_client->GetAvgLoad() << "][threshold " << load_threshold_ << "]"); std::string ip = work_client->getIp(); uint32_t port = work_client->getPort(); - work_client->UpdateClient(reserve_client->getIp(), - reserve_client->getPort()); + work_client->UpdateClient(reserve_client->getIp(), reserve_client->getPort()); ProxyInfo proxy = GetRandomProxy(ip, port); if (!proxy.ip().empty()) { @@ -350,12 +327,10 @@ void SendGroup::DoLoadBalance() { } bool SendGroup::NeedDoLoadBalance() { unique_read_lock rdlck(work_clients_mutex_); - if (load_threshold_ <= 0 || - work_clients_->size() == current_bus_vec_.size()) { - LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" - << load_threshold_ - << ",[tcp_client size]:" << work_clients_->size() - << ",[current_bus_vec size]:" << current_bus_vec_.size()); + if (load_threshold_ <= 0 || work_clients_->size() == current_proxy_vec_.size()) { + LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" << load_threshold_ + << ",[tcp_client size]:" << work_clients_->size() + << ",[current_proxy_vec size]:" << current_proxy_vec_.size()); need_balance_ = false; return false; } @@ -363,12 +338,11 @@ bool SendGroup::NeedDoLoadBalance() { return true; } void SendGroup::InitReserveClient() { - if (max_proxy_num_ >= current_bus_vec_.size()) { + if (max_proxy_num_ >= current_proxy_vec_.size()) { return; } - uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_; - uint64_t reserve_num = - std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num); + uint64_t max_reserve_num = current_proxy_vec_.size() - max_proxy_num_; + uint64_t reserve_num = std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num); if (reserve_num <= 0) { return; } @@ -376,15 +350,12 @@ void SendGroup::InitReserveClient() { unique_write_lock wtlck(reserve_clients_mutex_); reserve_clients_.clear(); - for (uint64_t i = current_bus_vec_.size() - reserve_num; - i < current_bus_vec_.size(); i++) { - ProxyInfo bus_tmp = current_bus_vec_[i]; - TcpClientTPtrT tcpClientTPtrT = - std::make_shared(io_context_, bus_tmp.ip(), bus_tmp.port()); + for (uint64_t i = current_proxy_vec_.size() - reserve_num; i < current_proxy_vec_.size(); i++) { + ProxyInfo tmp_proxy = current_proxy_vec_[i]; + TcpClientTPtrT tcpClientTPtrT = std::make_shared(io_context_, tmp_proxy.ip(), tmp_proxy.port()); reserve_clients_.push_back(tcpClientTPtrT); } - LOG_INFO( - "InitReserveClient reserve_clients size:" << reserve_clients_.size()); + LOG_INFO("InitReserveClient reserve_clients size:" << reserve_clients_.size()); } bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end) { if (begin && end) { @@ -409,14 +380,13 @@ TcpClientTPtrT SendGroup::GetMaxLoadClient() { ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) { ProxyInfo proxy_info; - for (auto &it : current_bus_vec_) { + for (auto &it : current_proxy_vec_) { if (it.ip() == ip && it.port() == port) { continue; } bool exist = false; for (int index = 0; index < reserve_clients_.size(); index++) { - if (it.ip() == reserve_clients_[index]->getIp() && - it.port() == reserve_clients_[index]->getPort()) { + if (it.ip() == reserve_clients_[index]->getIp() && it.port() == reserve_clients_[index]->getPort()) { exist = true; break; } @@ -444,8 +414,7 @@ TcpClientTPtrT SendGroup::GetReserveClient() { } bool exist = false; for (int index = 0; index < work_clients_->size(); index++) { - if (it->getIp() == (*work_clients_)[index]->getIp() && - it->getPort() == (*work_clients_)[index]->getPort()) { + if (it->getIp() == (*work_clients_)[index]->getIp() && it->getPort() == (*work_clients_)[index]->getPort()) { exist = true; break; } @@ -461,11 +430,10 @@ TcpClientTPtrT SendGroup::GetReserveClient() { bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) { unique_read_lock rdlck(work_clients_mutex_); for (int index = 0; index < work_clients_->size(); index++) { - if (ip == (*work_clients_)[index]->getIp() && - port == (*work_clients_)[index]->getPort()) { + if (ip == (*work_clients_)[index]->getIp() && port == (*work_clients_)[index]->getPort()) { return true; } } return false; } -} // namespace inlong +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h index a52d63dcd3e..c54f858b90b 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h @@ -1,53 +1,53 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #ifndef INLONG_SDK_SEND_GROUP_H #define INLONG_SDK_SEND_GROUP_H -#include "../client/tcp_client.h" +#include +#include + #include "../config/proxy_info.h" #include "../utils/send_buffer.h" -#include +#include "../client/tcp_client.h" + namespace inlong { const int kTimerMiSeconds = 10; const int kTimerMinute = 60000; using SteadyTimerPtr = std::shared_ptr; using IOContext = asio::io_context; -using io_context_work = - asio::executor_work_guard; +using io_context_work = asio::executor_work_guard; class SendGroup : noncopyable { -private: + private: IOContext io_context_; - io_context_work work_; + io_context_work work_; // 保持io_context.run()在无任何任务时不退出 std::thread thread_; void Run(); uint64_t max_proxy_num_; -public: + public: std::shared_ptr> work_clients_; std::shared_ptr> work_clients_old_; std::vector reserve_clients_; - ProxyInfoVec current_bus_vec_; - std::queue send_buf_list_; + ProxyInfoVec current_proxy_vec_; + std::queue send_proxy_list_; SteadyTimerPtr send_timer_; SteadyTimerPtr update_conf_timer_; @@ -75,11 +75,11 @@ class SendGroup : noncopyable { void PreDispatchData(std::error_code error); void DispatchData(std::error_code error); bool IsFull(); - uint32_t PushData(SendBufferPtrT send_buffer_ptr); + uint32_t PushData(const SendBufferPtrT &send_buffer_ptr); SendBufferPtrT PopData(); uint32_t GetQueueSize(); void UpdateConf(std::error_code error); - bool IsConfChanged(ProxyInfoVec ¤t_bus_vec, ProxyInfoVec &new_bus_vec); + bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec &new_proxy_vec); bool IsAvailable(); void ClearOldTcpClients(); @@ -96,7 +96,6 @@ class SendGroup : noncopyable { bool ExistInWorkClient(const std::string &ip, uint32_t port); }; using SendGroupPtr = std::shared_ptr; +} // namespace inlong -} // namespace inlong - -#endif // INLONG_SDK_SEND_GROUP_H +#endif // INLONG_SDK_SEND_GROUP_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h new file mode 100644 index 00000000000..26aa3870832 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include "../config/sdk_conf.h" +#include "../utils/send_buffer.h" +#include "../utils/logger.h" +#include "../core/sdk_msg.h" + +#ifndef INLONG_MSG_MANAGER_H +#define INLONG_MSG_MANAGER_H +namespace inlong { +class MsgManager { + private: + std::queue msg_queue_; + mutable std::mutex mutex_; + uint32_t queue_limit_; + bool enable_share_msg_; + MsgManager() { + uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); + uint32_t buffer_num = SdkConfig::getInstance()->recv_buf_size_ / data_capacity_; + queue_limit_ = std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num); + enable_share_msg_ = SdkConfig::getInstance()->enable_share_msg_; + LOG_INFO("Data capacity:" << data_capacity_ << ", buffer_num: " << buffer_num << ", limit: " << queue_limit_ + << ", enable share msg: " << enable_share_msg_); + for (int i = 0; i < queue_limit_; i++) { + std::shared_ptr msg_ptr = std::make_shared(); + if (msg_ptr == nullptr) { + LOG_INFO("Msg ptr is null"); + continue; + } + AddMsg(msg_ptr); + } + } + + public: + static MsgManager *GetInstance() { + static MsgManager instance; + return &instance; + } + SdkMsgPtr GetMsg() { + if (!enable_share_msg_) { + return nullptr; + } + std::lock_guard lck(mutex_); + if (msg_queue_.empty()) { + return nullptr; + } + SdkMsgPtr buf = msg_queue_.front(); + msg_queue_.pop(); + return buf; + } + void AddMsg(const SdkMsgPtr &msg_ptr) { + if (nullptr == msg_ptr || !enable_share_msg_) { + return; + } + std::lock_guard lck(mutex_); + if (msg_queue_.size() > queue_limit_) { + return; + } + msg_queue_.emplace(msg_ptr); + } + + void AddMsg(const std::vector &user_msg_vector) { + if (!enable_share_msg_) { + return; + } + std::lock_guard lck(mutex_); + for (auto it : user_msg_vector) { + if (nullptr == it) { + continue; + } + if (msg_queue_.size() > queue_limit_) { + return; + } + msg_queue_.emplace(std::move(it)); + } + } +}; +} // namespace inlong +#endif // INLONG_MSG_MANAGER_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 09014d728d1..1653d68de0d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -217,8 +217,8 @@ int32_t ProxyManager::GetProxy(const std::string &key, return GetProxyByClusterId(key, proxy_info_vec); } -int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, - bool is_inited) { +int32_t ProxyManager::CheckGroupIdConf(const std::string &inlong_group_id, + bool is_inited) { { unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); auto it = groupid_2_cluster_id_map_.find(inlong_group_id); @@ -379,7 +379,7 @@ void ProxyManager::WriteLocalCache() { } catch (...) { LOG_ERROR("WriteLocalCache error!"); } - LOG_INFO("WriteLocalCache bid number:" << groupid_count); + LOG_INFO("WriteLocalCache getGroupId number:" << groupid_count); } std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 3cd3e5491f6..419bc607987 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -62,7 +62,7 @@ class ProxyManager { static ProxyManager instance; return &instance; } - int32_t CheckBidConf(const std::string &inlong_group_id, bool is_inited); + int32_t CheckGroupIdConf(const std::string &inlong_group_id, bool is_inited); void Update(); void DoUpdate(); void Init(); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc index 28bfed473c7..8754d60ab2a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc @@ -29,9 +29,7 @@ char APIEncode::recvBuf[APIEncode::kRecvLen] = {0}; void APIEncode::decodeProtocoMsg(SendBuffer *buf) { memset(recvBuf, 0x0, kRecvLen); - // //LOG_DEBUG("print buf content, %s", buf->content()); - memcpy(recvBuf, buf->content(), buf->len()); - // memcpy(recvBuf, buf->content(), buf->len()); + memcpy(recvBuf, buf->GetData(), buf->GetDataLen()); char *p = recvBuf; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index dbac24f6a61..a1753b82553 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -22,6 +22,7 @@ #include "string.h" #include +#include namespace inlong { namespace constants { @@ -50,6 +51,7 @@ static const int32_t kLoadBalanceInterval = 300000; static const int32_t kHeartBeatInterval = 60000; static const bool kEnableBalance = true; static const bool kEnableLocalCache = true; +static const bool kEnableShareMsg = true; static const bool kEnablePack = true; static const uint32_t kPackSize = 409600; @@ -78,6 +80,10 @@ static const uint32_t kReserveProxyNum = 2; static const bool kEnableTCPNagle = true; static const uint32_t kTcpIdleTime = 600000; static const uint32_t kTcpDetectionInterval = 60000; +static const uint32_t kMaxRetryIntervalMs= 3000; +static const uint32_t kRetryIntervalMs= 200; +static const int32_t kRetryTimes = 1; +static const uint32_t kProxyRepeatTimes = 1; static const char kSerIP[] = "127.0.0.1"; static const uint32_t kSerPort = 46801; @@ -86,6 +92,7 @@ static const uint32_t kMsgType = 7; static const bool kEnableSetAffinity = false; static const uint32_t kMaskCPUAffinity = 0xff; static const uint16_t kExtendField = 0; +static const uint64_t kMaxSnowFlake = std::numeric_limits::max(); // http basic auth static const char kBasicAuthHeader[] = "Authorization:"; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h index 1f2970b17d3..4b2ca778ba0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h @@ -1,123 +1,120 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#ifndef INLONG_SDK_SEND_BUFFER_H -#define INLONG_SDK_SEND_BUFFER_H +#ifndef INLONG_SEND_BUFFER_H +#define INLONG_SEND_BUFFER_H #include #include +#include +#include +#include "asio.hpp" #include "atomic.h" #include "logger.h" #include "noncopyable.h" -#include "sdk_msg.h" -#include -#include -#include + +#include "../core/sdk_msg.h" +#include "../manager/msg_manager.h" namespace inlong { -class Connection; -using SteadyTimerPtr = std::shared_ptr; -using ConnectionPtr = std::shared_ptr; class SendBuffer : noncopyable { -private: - uint32_t uniq_id_; - std::atomic is_used_; - std::atomic is_packed_; - char *content_; - uint32_t size_; - int32_t msg_cnt_; - uint32_t len_; + private: + char *data_; + uint32_t data_len_; + uint32_t msg_cnt_; std::string inlong_group_id_; std::string inlong_stream_id_; - AtomicInt already_send_; - uint64_t first_send_time_; - uint64_t latest_send_time_; std::vector user_msg_vector_; -public: - SendBuffer(uint32_t size) - : uniq_id_(0), is_used_(false), is_packed_(false), size_(size), - msg_cnt_(0), len_(0), inlong_group_id_(), inlong_stream_id_(), - first_send_time_(0), latest_send_time_(0) { - content_ = new char[size]; - if (content_) { - memset(content_, 0x0, size); + public: + SendBuffer(uint32_t size) : msg_cnt_(0), data_len_(0), inlong_group_id_(), inlong_stream_id_() { + data_ = new char[size]; + if (data_) { + memset(data_, 0x0, size); } } ~SendBuffer() { - if (content_) { - delete[] content_; + if (data_) { + delete[] data_; } } - - char *content() { return content_; } - int32_t msgCnt() const { return msg_cnt_; } - void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; } - uint32_t len() { return len_; } - void setLen(const uint32_t len) { len_ = len; } - std::string bid() { return inlong_group_id_; } - std::string tid() { return inlong_stream_id_; } - void setInlongGroupId(const std::string &inlong_group_id) { + char *GetData() const { + return data_; + } + void SetData(char *data) { + data_ = data; + } + uint32_t GetDataLen() const { + return data_len_; + } + void SetDataLen(uint32_t data_len) { + data_len_ = data_len; + } + uint32_t GetMsgCnt() const { + return msg_cnt_; + } + void SetMsgCnt(uint32_t msg_cnt) { + msg_cnt_ = msg_cnt; + } + const std::string &GetInlongGroupId() const { + return inlong_group_id_; + } + void SetInlongGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; } - void setStreamId(const std::string &inlong_stream_id) { + const std::string &GetInlongStreamId() const { + return inlong_stream_id_; + } + void SetInlongStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = inlong_stream_id; } - void setUniqId(const uint32_t &uniq_id) { uniq_id_ = uniq_id; } + void addUserMsg(const SdkMsgPtr &msg) { user_msg_vector_.push_back(msg); } - void addUserMsg(SdkMsgPtr msg) { user_msg_vector_.push_back(msg); } void doUserCallBack() { for (auto it : user_msg_vector_) { if (it->cb_) { - it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), - it->msg_.data(), it->msg_.size(), it->user_report_time_, - it->user_client_ip_.data()); + it->cb_(it->inlong_group_id_.data(), + it->inlong_stream_id_.data(), + it->msg_.data(), + it->msg_.size(), + it->report_time_, + it->client_ip_.data()); } } } void releaseBuf() { - if (!is_used_) { - return; - } - uniq_id_ = 0; - is_used_ = false; - is_packed_ = false; - memset(content_, 0x0, size_); msg_cnt_ = 0; - len_ = 0; + data_len_ = 0; inlong_group_id_ = ""; inlong_stream_id_ = ""; - already_send_.getAndSet(0); - first_send_time_ = 0; - latest_send_time_ = 0; + for (const auto &it : user_msg_vector_) { + if (it->cb_) { + it->clear(); + } + } + MsgManager::GetInstance()->AddMsg(user_msg_vector_); user_msg_vector_.clear(); - AtomicInt fail_create_conn_; - fail_create_conn_.getAndSet(0); + user_msg_vector_.shrink_to_fit(); } - - void setIsPacked(bool is_packed) { is_packed_ = is_packed; } }; typedef std::shared_ptr SendBufferPtrT; -typedef std::queue SendBufferPtrDeque; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_SEND_BUFFER_H \ No newline at end of file +#endif // INLONG_SEND_BUFFER_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt index 80791b1c020..5f84a35b19a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/CMakeLists.txt @@ -22,17 +22,18 @@ project(dataproxy-sdk-python) set(CMAKE_CXX_STANDARD 11) -include_directories("./dataproxy-sdk-cpp/src/core") +include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/src/core") -include_directories("./dataproxy-sdk-cpp/third_party/lib") -include_directories("./dataproxy-sdk-cpp/third_party/lib64") +include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib") +include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64") +include_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib") add_subdirectory(pybind11) -add_subdirectory(dataproxy-sdk-cpp) -link_directories("./dataproxy-sdk-cpp/third_party/lib") -link_directories("./dataproxy-sdk-cpp/third_party/lib64") +link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib") +link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/third_party/lib64") +link_directories("${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib") pybind11_add_module(inlong_dataproxy inlong_dataproxy.cpp) -target_link_libraries(inlong_dataproxy PRIVATE pybind11::module dataproxy_sdk) +target_link_libraries(inlong_dataproxy PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/dataproxy-sdk-cpp/release/lib/dataproxy_sdk.a" liblog4cplusS.a libsnappy.a libcurl.a libssl.a libcrypto.a) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py index e8c33bc23ae..75974baa951 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py @@ -40,7 +40,7 @@ def main(): # step1. init api init_status = inlong_api.init_api(sys.argv[1]) if init_status: - print("init error, error code is: " + init_status) + print("init error, error code is: " + str(init_status)) return print("---->start sdk successfully") @@ -62,12 +62,12 @@ def main(): for i in range(count): send_status = inlong_api.send(inlong_group_id, inlong_stream_id, msg, len(msg), callback_func) if send_status: - print("tc_api_send error, error code is: " + send_status) + print("tc_api_send error, error code is: " + str(send_status)) # step3. close api close_status = inlong_api.close_api(10000) if close_status: - print("close sdk error, error code is: " + close_status) + print("close sdk error, error code is: " + str(close_status)) else: print("---->close sdk successfully") diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp index 26c260f3f9b..2fc5eeee83d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp @@ -18,44 +18,69 @@ */ #include +#include #include #include +#include +#include +#include namespace py = pybind11; -using namespace inlong; -class PyInLongApi : public InLongApi { -public: - int32_t PySend(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, py::function callback_func) { - py_callback = callback_func; - return Send(inlong_group_id, inlong_stream_id, msg, msg_len, &PyInLongApi::CallbackFunc); - } - -private: - static py::function py_callback; +std::map g_py_callbacks; +std::atomic stop_callbacks(false); +std::mutex callback_mutex; - static int CallbackFunc(const char *a, const char *b, const char *c, int32_t d, const int64_t e, const char *f) { - if (py_callback) { - try { - return py_callback(a, b, c, d, e, f).cast(); - } catch (const py::error_already_set &e) { - // Handle Python exception - return -1; - } +int UserCallBackBridge(const char *a, const char *b, const char *c, int32_t d, const int64_t e, const char *f) { + std::unique_lock lock(callback_mutex); + if (stop_callbacks) { + return -1; + } + auto it = g_py_callbacks.find(UserCallBackBridge); + if (it != g_py_callbacks.end()) { + if (stop_callbacks) { + return -1; + } + py::gil_scoped_acquire acquire; + if (stop_callbacks) { + return -1; } - return 0; + int result = it->second(a, b, c, d, e, f).cast(); + py::gil_scoped_release release; + return result; } -}; - -py::function PyInLongApi::py_callback; + return -1; +} PYBIND11_MODULE(inlong_dataproxy, m) { - m.doc() = "This module provides InLong dataproxy api to send message to InLong dataproxy."; - - py::class_(m, "InLongApi") + m.doc() = "Python bindings for InLong SDK API"; + py::class_(m, "InLongApi") .def(py::init<>()) - .def("init_api", &PyInLongApi::InitApi, py::arg("config_path")) - .def("add_bid", &PyInLongApi::AddBid, py::arg("group_ids")) - .def("send", &PyInLongApi::PySend, py::arg("inlong_group_id"), py::arg("inlong_stream_id"), py::arg("msg"), py::arg("msg_len"), py::arg("callback_func") = nullptr) - .def("close_api", &PyInLongApi::CloseApi, py::arg("max_waitms")); -} + .def("init_api", [](inlong::InLongApi& self, const char* config_path) { + stop_callbacks = false; + g_py_callbacks.clear(); + py::gil_scoped_release release; + int result = self.InitApi(config_path); + return result; + }) + .def("add_bid", &inlong::InLongApi::AddBid) + .def("send", [](inlong::InLongApi& self, const char* groupId, const char* streamId, const char* msg, int32_t msgLen, py::object pyCallback = py::none()) { + if (!pyCallback.is(py::none())) { + g_py_callbacks[UserCallBackBridge] = pyCallback.cast(); + py::gil_scoped_release release; + int result = self.Send(groupId, streamId, msg, msgLen, UserCallBackBridge); + return result; + } else { + int result = self.Send(groupId, streamId, msg, msgLen, nullptr); + return result; + } + }) + .def("close_api", [](inlong::InLongApi& self, int32_t timeout_ms) { + py::gil_scoped_release release; + int result = self.CloseApi(timeout_ms); + stop_callbacks = true; + std::unique_lock lock(callback_mutex); + py::gil_scoped_acquire acquire; + return result; + }); +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplicateFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplicateFunction.java new file mode 100644 index 00000000000..5270720d443 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplicateFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; +/** + * ReplicateFunction + * description: replicate(string, numeric)--Repeat the string numeric times and return a new string + */ +public class ReplicateFunction implements ValueParser { + + private ValueParser stringParser; + + private ValueParser countParser; + + public ReplicateFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + stringParser = OperatorTools.buildParser(expressions.get(0)); + countParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj = stringParser.parse(sourceData, rowIndex, context); + Object countObj = countParser.parse(sourceData, rowIndex, context); + String str = OperatorTools.parseString(stringObj); + double count = OperatorTools.parseBigDecimal(countObj).doubleValue(); + return repeat(str, count); + } + private String repeat(String str, double count) { + if (count == 0) { + return ""; + } + if (count == 1) { + return str; + } + StringBuilder repeatedStr = new StringBuilder(); + StringBuilder originStr = new StringBuilder(str); + while (count > 0) { + if (count % 2 != 0) { + repeatedStr.append(originStr); + } + count = Math.floor(count / 2); + originStr.append(originStr); + } + return repeatedStr.toString(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TrimFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TrimFunction.java new file mode 100644 index 00000000000..b3fbaf26dca --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TrimFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +/** + * TrimFunction + * description: trim(string)--Remove Spaces before and after the string. + */ +public class TrimFunction implements ValueParser { + + private ValueParser stringParser; + + public TrimFunction(Function expr) { + stringParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj = stringParser.parse(sourceData, rowIndex, context); + return OperatorTools.parseString(stringObj).trim(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 4fd25189dcf..31cf1c77498 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -35,6 +35,7 @@ import org.apache.inlong.sdk.transform.process.function.NowFunction; import org.apache.inlong.sdk.transform.process.function.PowerFunction; import org.apache.inlong.sdk.transform.process.function.ReplaceFunction; +import org.apache.inlong.sdk.transform.process.function.ReplicateFunction; import org.apache.inlong.sdk.transform.process.function.RoundFunction; import org.apache.inlong.sdk.transform.process.function.SinFunction; import org.apache.inlong.sdk.transform.process.function.SinhFunction; @@ -43,6 +44,7 @@ import org.apache.inlong.sdk.transform.process.function.TimestampExtractFunction; import org.apache.inlong.sdk.transform.process.function.ToDateFunction; import org.apache.inlong.sdk.transform.process.function.ToTimestampFunction; +import org.apache.inlong.sdk.transform.process.function.TrimFunction; import org.apache.inlong.sdk.transform.process.function.UnixTimestampFunction; import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; @@ -110,6 +112,8 @@ public class OperatorTools { functionMap.put("exp", ExpFunction::new); functionMap.put("substring", SubstringFunction::new); functionMap.put("replace", ReplaceFunction::new); + functionMap.put("trim", TrimFunction::new); + functionMap.put("replicate", ReplicateFunction::new); functionMap.put("locate", LocateFunction::new); functionMap.put("to_date", ToDateFunction::new); functionMap.put("date_format", DateFormatFunction::new); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index 25e984e6295..c544c08594c 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -119,6 +119,7 @@ public void testLocateFunction() throws Exception { Assert.assertEquals(output5.get(0), "result=null"); } + @Test public void testReplaceFunction() throws Exception { String transformSql = "select replace(string1, string2, string3) from source"; @@ -131,6 +132,72 @@ public void testReplaceFunction() throws Exception { Assert.assertEquals(1, output.size()); Assert.assertEquals(output.get(0), "result=holly"); + + @Test + public void testReplicateFunction() throws Exception { + String transformSql1 = "select replicate(string1, numeric1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: replicate('apple', 2) + List output1 = processor1.transform("apple|banana|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=appleapple"); + String transformSql2 = "select replicate(string2, numeric2) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: replicate('banana', 3) + List output2 = processor2.transform("apple|banana|cloud|1|3|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=bananabananabanana"); + // case3: replicate('banana', 1) + List output3 = processor2.transform("apple|banana|cloud|1|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output3.get(0), "result=banana"); + // case3: replicate('cloud', 0) + String transformSql3 = "select replicate(string3, numeric3) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output4 = processor3.transform("apple|banana|cloud|2|1|0", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result="); + } + + @Test + public void testTrimFunction() throws Exception { + String transformSql1 = "select trim(string1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: trim(' in long') + List output1 = processor1.transform(" in long|in long | in long ", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=in long"); + String transformSql2 = "select trim(string2) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: trim('in long ') + List output2 = processor2.transform(" in long|in long | in long ", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=in long"); + String transformSql3 = "select trim(string2) from source"; + TransformConfig config3 = new TransformConfig(transformSql2); + TransformProcessor processor3 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: trim(' in long ') + List output3 = processor3.transform(" in long|in long | in long ", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=in long"); + } }