Skip to content

Commit

Permalink
[MINOR] improve(server): Add debug log when cacheShuffleData (#2156)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add debug log when cacheShuffleData

### Why are the changes needed?

Supply a way to know the cacheShuffleData detail, include the partitionId, blockCount, we can know well from this debug level log. 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need.
  • Loading branch information
maobaolong authored Oct 12, 2024
1 parent d9ed675 commit 270afaa
Show file tree
Hide file tree
Showing 18 changed files with 117 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@

public class ShufflePartitionedBlock {

private int length;
private int dataLength;
private long crc;
private long blockId;
private int uncompressLength;
private ByteBuf data;
private long taskAttemptId;

public ShufflePartitionedBlock(
int length, int uncompressLength, long crc, long blockId, long taskAttemptId, byte[] data) {
this.length = length;
int dataLength,
int uncompressLength,
long crc,
long blockId,
long taskAttemptId,
byte[] data) {
this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
Expand All @@ -42,8 +47,13 @@ public ShufflePartitionedBlock(
}

public ShufflePartitionedBlock(
int length, int uncompressLength, long crc, long blockId, long taskAttemptId, ByteBuf data) {
this.length = length;
int dataLength,
int uncompressLength,
long crc,
long blockId,
long taskAttemptId,
ByteBuf data) {
this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
Expand All @@ -53,8 +63,8 @@ public ShufflePartitionedBlock(

// calculate the data size for this block in memory including metadata which are
// blockId, crc, taskAttemptId, length, uncompressLength
public long getSize() {
return length + 3 * 8 + 2 * 4;
public long getEncodedLength() {
return dataLength + 3 * 8 + 2 * 4;
}

@Override
Expand All @@ -66,23 +76,23 @@ public boolean equals(Object o) {
return false;
}
ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
return length == that.length
return dataLength == that.dataLength
&& crc == that.crc
&& blockId == that.blockId
&& data.equals(that.data);
}

@Override
public int hashCode() {
return Objects.hash(length, crc, blockId, data);
return Objects.hash(dataLength, crc, blockId, data);
}

public int getLength() {
return length;
public int getDataLength() {
return dataLength;
}

public void setLength(int length) {
this.length = length;
public void setDataLength(int dataLength) {
this.dataLength = dataLength;
}

public long getCrc() {
Expand Down Expand Up @@ -126,7 +136,7 @@ public String toString() {
return "ShufflePartitionedBlock{blockId["
+ blockId
+ "], length["
+ length
+ dataLength
+ "], uncompressLength["
+ uncompressLength
+ "], crc["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,36 @@
import java.util.Arrays;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.Pair;

public class ShufflePartitionedData {

private static final ShufflePartitionedBlock[] EMPTY_BLOCK_LIST =
new ShufflePartitionedBlock[] {};
private int partitionId;
private final ShufflePartitionedBlock[] blockList;
private final long totalBlockSize;
private final long totalBlockEncodedLength;
private final long totalBlockDataLength;

public ShufflePartitionedData(int partitionId, Pair<Long, ShufflePartitionedBlock[]> pair) {
public ShufflePartitionedData(
int partitionId, long encodedLength, long dataLength, ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : pair.getRight();
totalBlockSize = pair.getLeft();
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
totalBlockEncodedLength = encodedLength;
totalBlockDataLength = dataLength;
}

@VisibleForTesting
public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
long size = 0L;
long encodedLength = 0L;
long dataLength = 0L;
for (ShufflePartitionedBlock block : this.blockList) {
size += block.getSize();
encodedLength += block.getEncodedLength();
dataLength += block.getDataLength();
}
totalBlockSize = size;
totalBlockEncodedLength = encodedLength;
totalBlockDataLength = dataLength;
}

@Override
Expand All @@ -68,7 +73,11 @@ public ShufflePartitionedBlock[] getBlockList() {
return blockList;
}

public long getTotalBlockSize() {
return totalBlockSize;
public long getTotalBlockEncodedLength() {
return totalBlockEncodedLength;
}

public long getTotalBlockDataLength() {
return totalBlockDataLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void shufflePartitionedBlockTest() {
new Random().nextBytes(buf);

ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 1, 2, 3, 1, buf);
assertEquals(1, b1.getLength());
assertEquals(1, b1.getDataLength());
assertEquals(2, b1.getCrc());
assertEquals(3, b1.getBlockId());

Expand Down Expand Up @@ -73,7 +73,7 @@ public void testToString() {
"ShufflePartitionedBlock{blockId["
+ b1.getBlockId()
+ "], length["
+ b1.getLength()
+ b1.getDataLength()
+ "], uncompressLength["
+ b1.getUncompressLength()
+ "], crc["
Expand All @@ -87,6 +87,6 @@ public void testToString() {
@Test
public void testSize() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]);
assertEquals(b1.getSize(), b1.getLength() + 3 * Long.BYTES + 2 * Integer.BYTES);
assertEquals(b1.getEncodedLength(), b1.getDataLength() + 3 * Long.BYTES + 2 * Integer.BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -467,10 +466,10 @@ public void sendShuffleData(
return;
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req);
List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
for (ShufflePartitionedData spd : shufflePartitionedData) {
for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
Expand All @@ -496,7 +495,7 @@ public void sendShuffleData(
// TODO: Use ShuffleBufferWithSkipList to avoid caching block here.
shuffleServer.getShuffleMergeManager().cacheBlock(appId, shuffleId, spd);
}
long toReleasedSize = spd.getTotalBlockSize();
long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
Expand Down Expand Up @@ -529,7 +528,7 @@ public void sendShuffleData(
if (hasFailureOccurred) {
shuffleServer
.getShuffleBufferManager()
.releaseMemory(spd.getTotalBlockSize(), false, false);
.releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
}
}
}
Expand Down Expand Up @@ -559,7 +558,7 @@ public void sendShuffleData(
+ "], cost "
+ costTime
+ " ms with "
+ shufflePartitionedData.size()
+ shufflePartitionedDataList.size()
+ " blocks and "
+ requireSize
+ " bytes");
Expand Down Expand Up @@ -1660,24 +1659,23 @@ public void getSortedShuffleData(
}
}

private List<ShufflePartitionedData> toPartitionedData(SendShuffleDataRequest req) {
private List<ShufflePartitionedData> toPartitionedDataList(SendShuffleDataRequest req) {
List<ShufflePartitionedData> ret = Lists.newArrayList();

for (ShuffleData data : req.getShuffleDataList()) {
ret.add(
new ShufflePartitionedData(
data.getPartitionId(), toPartitionedBlock(data.getBlockList())));
ret.add(toPartitionedData(data.getPartitionId(), data.getBlockList()));
}

return ret;
}

private Pair<Long, ShufflePartitionedBlock[]> toPartitionedBlock(List<ShuffleBlock> blocks) {
private ShufflePartitionedData toPartitionedData(int partitionId, List<ShuffleBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
return Pair.of(0L, new ShufflePartitionedBlock[] {});
return new ShufflePartitionedData(partitionId, 0L, 0L, new ShufflePartitionedBlock[] {});
}
ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
long size = 0L;
long encodedLength = 0L;
long dataLength = 0L;
int i = 0;
for (ShuffleBlock block : blocks) {
ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
Expand All @@ -1689,10 +1687,11 @@ private Pair<Long, ShufflePartitionedBlock[]> toPartitionedBlock(List<ShuffleBlo
block.getBlockId(),
block.getTaskAttemptId(),
data);
size += ret[i].getSize();
encodedLength += ret[i].getEncodedLength();
dataLength += ret[i].getDataLength();
i++;
}
return Pair.of(size, ret);
return new ShufflePartitionedData(partitionId, encodedLength, dataLength, ret);
}

private Map<Integer, long[]> toPartitionBlocksMap(List<PartitionToBlockIds> partitionToBlockIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public StatusCode cacheShuffleData(
String appId, int shuffleId, boolean isPreAllocated, ShufflePartitionedData spd) {
refreshAppId(appId);
long partitionSize = getPartitionDataSize(appId, shuffleId, spd.getPartitionId());
long deltaSize = spd.getTotalBlockSize();
long deltaSize = spd.getTotalBlockEncodedLength();
partitionSize += deltaSize;
// We do not need to check the huge partition size here, after old client upgraded to this
// version,
Expand Down Expand Up @@ -506,7 +506,7 @@ public void updateCachedBlockIds(
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
size += spb.getSize();
size += spb.getEncodedLength();
}
}
long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, Composi
} catch (Exception e) {
LOG.error(
"Unexpected exception for System.arraycopy, length["
+ block.getLength()
+ block.getDataLength()
+ "], offset["
+ offset
+ "], dataLength["
Expand All @@ -135,7 +135,7 @@ private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, Composi
e);
throw e;
}
offset += block.getLength();
offset += block.getDataLength();
}
}

Expand Down Expand Up @@ -168,13 +168,13 @@ protected void updateSegmentsWithoutBlockId(
new BufferSegment(
block.getBlockId(),
currentOffset,
block.getLength(),
block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
currentOffset += block.getLength();
currentOffset += block.getDataLength();
// check if length >= request buffer size
if (currentOffset >= readBufferSize) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,17 @@ public StatusCode cacheShuffleData(
Arrays.stream(spd.getBlockList())
.forEach(
b -> {
int blockSize = b.getLength();
int blockSize = b.getDataLength();
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
});
}
LOG.debug(
"cache shuffle data, size: {}, blockCount: {}, appId: {}, shuffleId: {}, partitionId: {}",
spd.getTotalBlockDataLength(),
spd.getBlockList().length,
appId,
shuffleId,
spd.getPartitionId());
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public long append(ShufflePartitionedData data) {
// If sendShuffleData retried, we may receive duplicate block. The duplicate
// block would gc without release. Here we must release the duplicated block.
if (blocks.add(block)) {
size += block.getSize();
size += block.getEncodedLength();
} else {
block.getData().release();
}
Expand Down Expand Up @@ -126,10 +126,10 @@ public long release() {
for (ShufflePartitionedBlock spb : blocks) {
try {
spb.getData().release();
releasedSize += spb.getSize();
releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
failedToReleaseSize += spb.getSize();
failedToReleaseSize += spb.getEncodedLength();
}
}
if (lastException != null) {
Expand Down Expand Up @@ -261,13 +261,13 @@ private boolean updateSegmentsWithBlockId(
new BufferSegment(
block.getBlockId(),
currentOffset,
block.getLength(),
block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
currentOffset += block.getLength();
currentOffset += block.getDataLength();
if (currentOffset >= readBufferSize) {
break;
}
Expand Down
Loading

0 comments on commit 270afaa

Please sign in to comment.