From da35d3daba78732d1e2421f7c990654f32f7208d Mon Sep 17 00:00:00 2001 From: walter Date: Tue, 5 Nov 2024 21:11:44 +0800 Subject: [PATCH] [chore](agent) log the binary message size of agent tasks (#43239) --- .../org/apache/doris/common/ThriftUtils.java | 85 +++++++++++++++++++ .../org/apache/doris/task/AgentBatchTask.java | 9 ++ 2 files changed, 94 insertions(+) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java new file mode 100644 index 00000000000000..92881cec5261bf --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/ThriftUtils.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.thrift.TBase; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +// Utility functions for thrift +public class ThriftUtils { + // Get the size of the binary message of the thrift object + public static long getBinaryMessageSize(TBase thriftObject) { + TSizeTransport trans = new TSizeTransport(); + TBinaryProtocol protocol = new TBinaryProtocol(trans); + try { + thriftObject.write(protocol); + } catch (TException e) { + return -1; + } + return trans.getSize(); + } + + // A transport class that only records the size of the message + private static class TSizeTransport extends TTransport { + private long size = 0; + + public long getSize() { + return size; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + size += len; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() throws TTransportException { + } + + @Override + public void close() { + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + throw new UnsupportedOperationException("Unimplemented method 'read'"); + } + + @Override + public TConfiguration getConfiguration() { + return new TConfiguration(); + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index be698776cac61a..c4120b9a199809 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.ThriftUtils; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TAgentServiceVersion; @@ -208,6 +209,14 @@ public void run() { private static void submitTasks(long backendId, BackendService.Client client, List agentTaskRequests) throws TException { if (!agentTaskRequests.isEmpty()) { + if (LOG.isDebugEnabled()) { + long size = agentTaskRequests.stream() + .map(ThriftUtils::getBinaryMessageSize) + .reduce(0L, Long::sum); + TTaskType firstTaskType = agentTaskRequests.get(0).getTaskType(); + LOG.debug("submit {} tasks to backend[{}], total size: {}, first task type: {}", + agentTaskRequests.size(), backendId, size, firstTaskType); + } client.submitTasks(agentTaskRequests); } if (LOG.isDebugEnabled()) {