Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] improve(server): Add debug log when cacheShuffleData #2156

Merged
merged 4 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@

public class ShufflePartitionedBlock {

private int length;
private int dataLength;
private long crc;
private long blockId;
private int uncompressLength;
private ByteBuf data;
private long taskAttemptId;

public ShufflePartitionedBlock(
int length, int uncompressLength, long crc, long blockId, long taskAttemptId, byte[] data) {
this.length = length;
int dataLength,
int uncompressLength,
long crc,
long blockId,
long taskAttemptId,
byte[] data) {
this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
Expand All @@ -42,8 +47,13 @@ public ShufflePartitionedBlock(
}

public ShufflePartitionedBlock(
int length, int uncompressLength, long crc, long blockId, long taskAttemptId, ByteBuf data) {
this.length = length;
int dataLength,
int uncompressLength,
long crc,
long blockId,
long taskAttemptId,
ByteBuf data) {
this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
Expand All @@ -53,8 +63,8 @@ public ShufflePartitionedBlock(

// calculate the data size for this block in memory including metadata which are
// blockId, crc, taskAttemptId, length, uncompressLength
public long getSize() {
return length + 3 * 8 + 2 * 4;
public long getEncodedLength() {
return dataLength + 3 * 8 + 2 * 4;
}

@Override
Expand All @@ -66,23 +76,23 @@ public boolean equals(Object o) {
return false;
}
ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
return length == that.length
return dataLength == that.dataLength
&& crc == that.crc
&& blockId == that.blockId
&& data.equals(that.data);
}

@Override
public int hashCode() {
return Objects.hash(length, crc, blockId, data);
return Objects.hash(dataLength, crc, blockId, data);
}

public int getLength() {
return length;
public int getDataLength() {
return dataLength;
}

public void setLength(int length) {
this.length = length;
public void setDataLength(int dataLength) {
this.dataLength = dataLength;
}

public long getCrc() {
Expand Down Expand Up @@ -126,7 +136,7 @@ public String toString() {
return "ShufflePartitionedBlock{blockId["
+ blockId
+ "], length["
+ length
+ dataLength
+ "], uncompressLength["
+ uncompressLength
+ "], crc["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,36 @@
import java.util.Arrays;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.Pair;

public class ShufflePartitionedData {

private static final ShufflePartitionedBlock[] EMPTY_BLOCK_LIST =
new ShufflePartitionedBlock[] {};
private int partitionId;
private final ShufflePartitionedBlock[] blockList;
private final long totalBlockSize;
private final long totalBlockEncodedLength;
private final long totalBlockDataLength;

public ShufflePartitionedData(int partitionId, Pair<Long, ShufflePartitionedBlock[]> pair) {
public ShufflePartitionedData(
int partitionId, long encodedLength, long dataLength, ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : pair.getRight();
totalBlockSize = pair.getLeft();
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
totalBlockEncodedLength = encodedLength;
totalBlockDataLength = dataLength;
}

@VisibleForTesting
public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
long size = 0L;
long encodedLength = 0L;
long dataLength = 0L;
for (ShufflePartitionedBlock block : this.blockList) {
size += block.getSize();
encodedLength += block.getEncodedLength();
dataLength += block.getDataLength();
}
totalBlockSize = size;
totalBlockEncodedLength = encodedLength;
totalBlockDataLength = dataLength;
}

@Override
Expand All @@ -68,7 +73,11 @@ public ShufflePartitionedBlock[] getBlockList() {
return blockList;
}

public long getTotalBlockSize() {
return totalBlockSize;
public long getTotalBlockEncodedLength() {
return totalBlockEncodedLength;
}

public long getTotalBlockDataLength() {
return totalBlockDataLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void shufflePartitionedBlockTest() {
new Random().nextBytes(buf);

ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 1, 2, 3, 1, buf);
assertEquals(1, b1.getLength());
assertEquals(1, b1.getDataLength());
assertEquals(2, b1.getCrc());
assertEquals(3, b1.getBlockId());

Expand Down Expand Up @@ -73,7 +73,7 @@ public void testToString() {
"ShufflePartitionedBlock{blockId["
+ b1.getBlockId()
+ "], length["
+ b1.getLength()
+ b1.getDataLength()
+ "], uncompressLength["
+ b1.getUncompressLength()
+ "], crc["
Expand All @@ -87,6 +87,6 @@ public void testToString() {
@Test
public void testSize() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]);
assertEquals(b1.getSize(), b1.getLength() + 3 * Long.BYTES + 2 * Integer.BYTES);
assertEquals(b1.getEncodedLength(), b1.getDataLength() + 3 * Long.BYTES + 2 * Integer.BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -467,10 +466,10 @@ public void sendShuffleData(
return;
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
Expand All @@ -496,7 +495,7 @@ public void sendShuffleData(
// TODO: Use ShuffleBufferWithSkipList to avoid caching block here.
shuffleServer.getShuffleMergeManager().cacheBlock(appId, shuffleId, spd);
}
long toReleasedSize = spd.getTotalBlockSize();
long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
Expand Down Expand Up @@ -529,7 +528,7 @@ public void sendShuffleData(
if (hasFailureOccurred) {
shuffleServer
.getShuffleBufferManager()
.releaseMemory(spd.getTotalBlockSize(), false, false);
.releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
}
}
}
Expand Down Expand Up @@ -559,7 +558,7 @@ public void sendShuffleData(
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedData.size()
+ shufflePartitionedDataList.size()
+ " blocks and "
+ requireSize
+ " bytes");
Expand Down Expand Up @@ -1649,24 +1648,23 @@ public void getSortedShuffleData(
}
}

private List<ShufflePartitionedData> toPartitionedData(SendShuffleDataRequest req) {
private List<ShufflePartitionedData> toPartitionedDataList(SendShuffleDataRequest req) {
List<ShufflePartitionedData> ret = Lists.newArrayList();

for (ShuffleData data : req.getShuffleDataList()) {
ret.add(
new ShufflePartitionedData(
data.getPartitionId(), toPartitionedBlock(data.getBlockList())));
ret.add(toPartitionedData(data.getPartitionId(), data.getBlockList()));
}

return ret;
}

private Pair<Long, ShufflePartitionedBlock[]> toPartitionedBlock(List<ShuffleBlock> blocks) {
private ShufflePartitionedData toPartitionedData(int partitionId, List<ShuffleBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
return Pair.of(0L, new ShufflePartitionedBlock[] {});
return new ShufflePartitionedData(partitionId, 0L, 0L, new ShufflePartitionedBlock[] {});
}
ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
long size = 0L;
long encodedLength = 0L;
long dataLength = 0L;
int i = 0;
for (ShuffleBlock block : blocks) {
ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
Expand All @@ -1678,10 +1676,11 @@ private Pair<Long, ShufflePartitionedBlock[]> toPartitionedBlock(List<ShuffleBlo
block.getBlockId(),
block.getTaskAttemptId(),
data);
size += ret[i].getSize();
encodedLength += ret[i].getEncodedLength();
dataLength += ret[i].getDataLength();
i++;
}
return Pair.of(size, ret);
return new ShufflePartitionedData(partitionId, encodedLength, dataLength, ret);
}

private Map<Integer, long[]> toPartitionBlocksMap(List<PartitionToBlockIds> partitionToBlockIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public StatusCode cacheShuffleData(
String appId, int shuffleId, boolean isPreAllocated, ShufflePartitionedData spd) {
refreshAppId(appId);
long partitionSize = getPartitionDataSize(appId, shuffleId, spd.getPartitionId());
long deltaSize = spd.getTotalBlockSize();
long deltaSize = spd.getTotalBlockEncodedLength();
partitionSize += deltaSize;
// We do not need to check the huge partition size here, after old client upgraded to this
// version,
Expand Down Expand Up @@ -506,7 +506,7 @@ public void updateCachedBlockIds(
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
size += spb.getSize();
size += spb.getEncodedLength();
}
}
long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, Composi
} catch (Exception e) {
LOG.error(
"Unexpected exception for System.arraycopy, length["
+ block.getLength()
+ block.getDataLength()
+ "], offset["
+ offset
+ "], dataLength["
Expand All @@ -135,7 +135,7 @@ private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, Composi
e);
throw e;
}
offset += block.getLength();
offset += block.getDataLength();
}
}

Expand Down Expand Up @@ -168,13 +168,13 @@ protected void updateSegmentsWithoutBlockId(
new BufferSegment(
block.getBlockId(),
currentOffset,
block.getLength(),
block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
currentOffset += block.getLength();
currentOffset += block.getDataLength();
// check if length >= request buffer size
if (currentOffset >= readBufferSize) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,17 @@ public StatusCode cacheShuffleData(
Arrays.stream(spd.getBlockList())
.forEach(
b -> {
int blockSize = b.getLength();
int blockSize = b.getDataLength();
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
});
}
LOG.debug(
"cache shuffle data, size: {}, blockCount: {}, appId: {}, shuffleId: {}, partitionId: {}",
spd.getTotalBlockDataLength(),
spd.getBlockList().length,
appId,
shuffleId,
spd.getPartitionId());
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public long append(ShufflePartitionedData data) {
// If sendShuffleData retried, we may receive duplicate block. The duplicate
// block would gc without release. Here we must release the duplicated block.
if (blocks.add(block)) {
size += block.getSize();
size += block.getEncodedLength();
} else {
block.getData().release();
}
Expand Down Expand Up @@ -126,10 +126,10 @@ public long release() {
for (ShufflePartitionedBlock spb : blocks) {
try {
spb.getData().release();
releasedSize += spb.getSize();
releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
failedToReleaseSize += spb.getSize();
failedToReleaseSize += spb.getEncodedLength();
}
}
if (lastException != null) {
Expand Down Expand Up @@ -261,13 +261,13 @@ private boolean updateSegmentsWithBlockId(
new BufferSegment(
block.getBlockId(),
currentOffset,
block.getLength(),
block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
currentOffset += block.getLength();
currentOffset += block.getDataLength();
if (currentOffset >= readBufferSize) {
break;
}
Expand Down
Loading
Loading