Skip to content

Commit

Permalink
[chore](agent) log the binary message size of agent tasks (apache#43239)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Nov 5, 2024
1 parent 74c9497 commit da35d3d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import org.apache.thrift.TBase;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

// Utility functions for thrift
public class ThriftUtils {
// Get the size of the binary message of the thrift object
public static long getBinaryMessageSize(TBase<?, ?> thriftObject) {
TSizeTransport trans = new TSizeTransport();
TBinaryProtocol protocol = new TBinaryProtocol(trans);
try {
thriftObject.write(protocol);
} catch (TException e) {
return -1;
}
return trans.getSize();
}

// A transport class that only records the size of the message
private static class TSizeTransport extends TTransport {
private long size = 0;

public long getSize() {
return size;
}

@Override
public void write(byte[] buf, int off, int len) throws TTransportException {
size += len;
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void open() throws TTransportException {
}

@Override
public void close() {
}

@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
throw new UnsupportedOperationException("Unimplemented method 'read'");
}

@Override
public TConfiguration getConfiguration() {
return new TConfiguration();
}

@Override
public void updateKnownMessageSize(long size) throws TTransportException {
}

@Override
public void checkReadBytesAvailable(long numBytes) throws TTransportException {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThriftUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentServiceVersion;
Expand Down Expand Up @@ -208,6 +209,14 @@ public void run() {
private static void submitTasks(long backendId,
BackendService.Client client, List<TAgentTaskRequest> agentTaskRequests) throws TException {
if (!agentTaskRequests.isEmpty()) {
if (LOG.isDebugEnabled()) {
long size = agentTaskRequests.stream()
.map(ThriftUtils::getBinaryMessageSize)
.reduce(0L, Long::sum);
TTaskType firstTaskType = agentTaskRequests.get(0).getTaskType();
LOG.debug("submit {} tasks to backend[{}], total size: {}, first task type: {}",
agentTaskRequests.size(), backendId, size, firstTaskType);
}
client.submitTasks(agentTaskRequests);
}
if (LOG.isDebugEnabled()) {
Expand Down

0 comments on commit da35d3d

Please sign in to comment.