From 270afaa78453ac161078f84f29b1010de6776e99 Mon Sep 17 00:00:00 2001 From: maobaolong Date: Sat, 12 Oct 2024 14:17:00 +0800 Subject: [PATCH] [MINOR] improve(server): Add debug log when cacheShuffleData (#2156) ### What changes were proposed in this pull request? Add debug log when cacheShuffleData ### Why are the changes needed? Supply a way to know the cacheShuffleData detail, include the partitionId, blockCount, we can know well from this debug level log. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need. --- .../common/ShufflePartitionedBlock.java | 38 ++++++++++++------- .../common/ShufflePartitionedData.java | 29 +++++++++----- .../common/ShufflePartitionedBlockTest.java | 6 +-- .../server/ShuffleServerGrpcService.java | 29 +++++++------- .../uniffle/server/ShuffleTaskManager.java | 4 +- .../server/buffer/AbstractShuffleBuffer.java | 8 ++-- .../server/buffer/ShuffleBufferManager.java | 9 ++++- .../buffer/ShuffleBufferWithLinkedList.java | 10 ++--- .../buffer/ShuffleBufferWithSkipList.java | 10 ++--- .../netty/ShuffleServerNettyHandler.java | 31 ++++++++------- .../server/storage/SingleStorageManager.java | 2 +- .../server/ShuffleTaskManagerTest.java | 6 +-- .../ShuffleBufferWithLinkedListTest.java | 10 ++--- .../impl/HadoopShuffleWriteHandler.java | 2 +- .../handler/impl/LocalFileWriteHandler.java | 2 +- .../storage/HadoopShuffleHandlerTestBase.java | 4 +- .../impl/HadoopShuffleReadHandlerTest.java | 2 +- .../impl/LocalFileServerReadHandlerTest.java | 4 +- 18 files changed, 117 insertions(+), 89 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java index 1ce68b6b6b..e476e37f36 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java +++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java @@ -24,7 +24,7 @@ public class ShufflePartitionedBlock { - private int length; + private int dataLength; private long crc; private long blockId; private int uncompressLength; @@ -32,8 +32,13 @@ public class ShufflePartitionedBlock { private long taskAttemptId; public ShufflePartitionedBlock( - int length, int uncompressLength, long crc, long blockId, long taskAttemptId, byte[] data) { - this.length = length; + int dataLength, + int uncompressLength, + long crc, + long blockId, + long taskAttemptId, + byte[] data) { + this.dataLength = dataLength; this.crc = crc; this.blockId = blockId; this.uncompressLength = uncompressLength; @@ -42,8 +47,13 @@ public ShufflePartitionedBlock( } public ShufflePartitionedBlock( - int length, int uncompressLength, long crc, long blockId, long taskAttemptId, ByteBuf data) { - this.length = length; + int dataLength, + int uncompressLength, + long crc, + long blockId, + long taskAttemptId, + ByteBuf data) { + this.dataLength = dataLength; this.crc = crc; this.blockId = blockId; this.uncompressLength = uncompressLength; @@ -53,8 +63,8 @@ public ShufflePartitionedBlock( // calculate the data size for this block in memory including metadata which are // blockId, crc, taskAttemptId, length, uncompressLength - public long getSize() { - return length + 3 * 8 + 2 * 4; + public long getEncodedLength() { + return dataLength + 3 * 8 + 2 * 4; } @Override @@ -66,7 +76,7 @@ public boolean equals(Object o) { return false; } ShufflePartitionedBlock that = (ShufflePartitionedBlock) o; - return length == that.length + return dataLength == that.dataLength && crc == that.crc && blockId == that.blockId && data.equals(that.data); @@ -74,15 +84,15 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(length, crc, blockId, data); + return Objects.hash(dataLength, crc, blockId, data); } - public int getLength() { - return length; + public int getDataLength() { + return dataLength; } - public void setLength(int length) { - this.length = length; + public void setDataLength(int dataLength) { + this.dataLength = dataLength; } public long getCrc() { @@ -126,7 +136,7 @@ public String toString() { return "ShufflePartitionedBlock{blockId[" + blockId + "], length[" - + length + + dataLength + "], uncompressLength[" + uncompressLength + "], crc[" diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java index 6793eccd00..fbc3c27973 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java +++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java @@ -20,7 +20,6 @@ import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.tuple.Pair; public class ShufflePartitionedData { @@ -28,23 +27,29 @@ public class ShufflePartitionedData { new ShufflePartitionedBlock[] {}; private int partitionId; private final ShufflePartitionedBlock[] blockList; - private final long totalBlockSize; + private final long totalBlockEncodedLength; + private final long totalBlockDataLength; - public ShufflePartitionedData(int partitionId, Pair pair) { + public ShufflePartitionedData( + int partitionId, long encodedLength, long dataLength, ShufflePartitionedBlock[] blockList) { this.partitionId = partitionId; - this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : pair.getRight(); - totalBlockSize = pair.getLeft(); + this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList; + totalBlockEncodedLength = encodedLength; + totalBlockDataLength = dataLength; } @VisibleForTesting public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] blockList) { this.partitionId = partitionId; this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList; - long size = 0L; + long encodedLength = 0L; + long dataLength = 0L; for (ShufflePartitionedBlock block : this.blockList) { - size += block.getSize(); + encodedLength += block.getEncodedLength(); + dataLength += block.getDataLength(); } - totalBlockSize = size; + totalBlockEncodedLength = encodedLength; + totalBlockDataLength = dataLength; } @Override @@ -68,7 +73,11 @@ public ShufflePartitionedBlock[] getBlockList() { return blockList; } - public long getTotalBlockSize() { - return totalBlockSize; + public long getTotalBlockEncodedLength() { + return totalBlockEncodedLength; + } + + public long getTotalBlockDataLength() { + return totalBlockDataLength; } } diff --git a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java index 3f894ab9eb..eb79ede631 100644 --- a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java +++ b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java @@ -37,7 +37,7 @@ public void shufflePartitionedBlockTest() { new Random().nextBytes(buf); ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 1, 2, 3, 1, buf); - assertEquals(1, b1.getLength()); + assertEquals(1, b1.getDataLength()); assertEquals(2, b1.getCrc()); assertEquals(3, b1.getBlockId()); @@ -73,7 +73,7 @@ public void testToString() { "ShufflePartitionedBlock{blockId[" + b1.getBlockId() + "], length[" - + b1.getLength() + + b1.getDataLength() + "], uncompressLength[" + b1.getUncompressLength() + "], crc[" @@ -87,6 +87,6 @@ public void testToString() { @Test public void testSize() { ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]); - assertEquals(b1.getSize(), b1.getLength() + 3 * Long.BYTES + 2 * Integer.BYTES); + assertEquals(b1.getEncodedLength(), b1.getDataLength() + 3 * Long.BYTES + 2 * Integer.BYTES); } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index c9375123f4..d43d1e71f0 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -34,7 +34,6 @@ import io.grpc.stub.StreamObserver; import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -467,10 +466,10 @@ public void sendShuffleData( return; } final long start = System.currentTimeMillis(); - List shufflePartitionedData = toPartitionedData(req); + List shufflePartitionedDataList = toPartitionedDataList(req); long alreadyReleasedSize = 0; boolean hasFailureOccurred = false; - for (ShufflePartitionedData spd : shufflePartitionedData) { + for (ShufflePartitionedData spd : shufflePartitionedDataList) { String shuffleDataInfo = "appId[" + appId @@ -496,7 +495,7 @@ public void sendShuffleData( // TODO: Use ShuffleBufferWithSkipList to avoid caching block here. shuffleServer.getShuffleMergeManager().cacheBlock(appId, shuffleId, spd); } - long toReleasedSize = spd.getTotalBlockSize(); + long toReleasedSize = spd.getTotalBlockEncodedLength(); // after each cacheShuffleData call, the `preAllocatedSize` is updated timely. manager.releasePreAllocatedSize(toReleasedSize); alreadyReleasedSize += toReleasedSize; @@ -529,7 +528,7 @@ public void sendShuffleData( if (hasFailureOccurred) { shuffleServer .getShuffleBufferManager() - .releaseMemory(spd.getTotalBlockSize(), false, false); + .releaseMemory(spd.getTotalBlockEncodedLength(), false, false); } } } @@ -559,7 +558,7 @@ public void sendShuffleData( + "], cost " + costTime + " ms with " - + shufflePartitionedData.size() + + shufflePartitionedDataList.size() + " blocks and " + requireSize + " bytes"); @@ -1660,24 +1659,23 @@ public void getSortedShuffleData( } } - private List toPartitionedData(SendShuffleDataRequest req) { + private List toPartitionedDataList(SendShuffleDataRequest req) { List ret = Lists.newArrayList(); for (ShuffleData data : req.getShuffleDataList()) { - ret.add( - new ShufflePartitionedData( - data.getPartitionId(), toPartitionedBlock(data.getBlockList()))); + ret.add(toPartitionedData(data.getPartitionId(), data.getBlockList())); } return ret; } - private Pair toPartitionedBlock(List blocks) { + private ShufflePartitionedData toPartitionedData(int partitionId, List blocks) { if (blocks == null || blocks.size() == 0) { - return Pair.of(0L, new ShufflePartitionedBlock[] {}); + return new ShufflePartitionedData(partitionId, 0L, 0L, new ShufflePartitionedBlock[] {}); } ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()]; - long size = 0L; + long encodedLength = 0L; + long dataLength = 0L; int i = 0; for (ShuffleBlock block : blocks) { ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData()); @@ -1689,10 +1687,11 @@ private Pair toPartitionedBlock(List toPartitionBlocksMap(List partitionToBlockIds) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 8f7cfd4d44..e51ab53525 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -332,7 +332,7 @@ public StatusCode cacheShuffleData( String appId, int shuffleId, boolean isPreAllocated, ShufflePartitionedData spd) { refreshAppId(appId); long partitionSize = getPartitionDataSize(appId, shuffleId, spd.getPartitionId()); - long deltaSize = spd.getTotalBlockSize(); + long deltaSize = spd.getTotalBlockEncodedLength(); partitionSize += deltaSize; // We do not need to check the huge partition size here, after old client upgraded to this // version, @@ -506,7 +506,7 @@ public void updateCachedBlockIds( synchronized (bitmap) { for (ShufflePartitionedBlock spb : spbs) { bitmap.addLong(spb.getBlockId()); - size += spb.getSize(); + size += spb.getEncodedLength(); } } long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java index 62e2728bff..f520603da2 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java @@ -126,7 +126,7 @@ private void updateShuffleData(List readBlocks, Composi } catch (Exception e) { LOG.error( "Unexpected exception for System.arraycopy, length[" - + block.getLength() + + block.getDataLength() + "], offset[" + offset + "], dataLength[" @@ -135,7 +135,7 @@ private void updateShuffleData(List readBlocks, Composi e); throw e; } - offset += block.getLength(); + offset += block.getDataLength(); } } @@ -168,13 +168,13 @@ protected void updateSegmentsWithoutBlockId( new BufferSegment( block.getBlockId(), currentOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), block.getCrc(), block.getTaskAttemptId())); readBlocks.add(block); // update offset - currentOffset += block.getLength(); + currentOffset += block.getDataLength(); // check if length >= request buffer size if (currentOffset >= readBufferSize) { break; diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 18dd94c198..aac9be42e5 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -226,10 +226,17 @@ public StatusCode cacheShuffleData( Arrays.stream(spd.getBlockList()) .forEach( b -> { - int blockSize = b.getLength(); + int blockSize = b.getDataLength(); ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize); }); } + LOG.debug( + "cache shuffle data, size: {}, blockCount: {}, appId: {}, shuffleId: {}, partitionId: {}", + spd.getTotalBlockDataLength(), + spd.getBlockList().length, + appId, + shuffleId, + spd.getPartitionId()); updateShuffleSize(appId, shuffleId, size); synchronized (this) { flushSingleBufferIfNecessary( diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index 6f94d7a50d..9597cbfe31 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -58,7 +58,7 @@ public long append(ShufflePartitionedData data) { // If sendShuffleData retried, we may receive duplicate block. The duplicate // block would gc without release. Here we must release the duplicated block. if (blocks.add(block)) { - size += block.getSize(); + size += block.getEncodedLength(); } else { block.getData().release(); } @@ -126,10 +126,10 @@ public long release() { for (ShufflePartitionedBlock spb : blocks) { try { spb.getData().release(); - releasedSize += spb.getSize(); + releasedSize += spb.getEncodedLength(); } catch (Throwable t) { lastException = t; - failedToReleaseSize += spb.getSize(); + failedToReleaseSize += spb.getEncodedLength(); } } if (lastException != null) { @@ -261,13 +261,13 @@ private boolean updateSegmentsWithBlockId( new BufferSegment( block.getBlockId(), currentOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), block.getCrc(), block.getTaskAttemptId())); readBlocks.add(block); // update offset - currentOffset += block.getLength(); + currentOffset += block.getDataLength(); if (currentOffset >= readBufferSize) { break; } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index d8ed3b2db1..50e6d686f2 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -68,7 +68,7 @@ public long append(ShufflePartitionedData data) { if (!blocksMap.containsKey(block.getBlockId())) { blocksMap.put(block.getBlockId(), block); blockCount++; - size += block.getSize(); + size += block.getEncodedLength(); } else { block.getData().release(); } @@ -127,10 +127,10 @@ public long release() { for (ShufflePartitionedBlock spb : blocksMap.values()) { try { spb.getData().release(); - releasedSize += spb.getSize(); + releasedSize += spb.getEncodedLength(); } catch (Throwable t) { lastException = t; - failedToReleaseSize += spb.getSize(); + failedToReleaseSize += spb.getEncodedLength(); } } if (lastException != null) { @@ -249,13 +249,13 @@ private boolean updateSegments( new BufferSegment( block.getBlockId(), currentOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), block.getCrc(), block.getTaskAttemptId())); readBlocks.add(block); // update offset - currentOffset += block.getLength(); + currentOffset += block.getDataLength(); if (currentOffset >= readBufferSize) { break; } diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index e40bf4d87f..3c7bb736f7 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -30,7 +30,6 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +166,9 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData + ", requireBlocksSize=" + requireBlocksSize + ", stageAttemptNumber=" - + stageAttemptNumber); + + stageAttemptNumber + + ", partitionCount=" + + req.getPartitionToBlocks().size()); ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId); if (taskInfo == null) { @@ -274,10 +275,10 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData } final long start = System.currentTimeMillis(); shuffleBufferManager.releaseMemory(req.encodedLength(), false, true); - List shufflePartitionedData = toPartitionedData(req); + List shufflePartitionedDataList = toPartitionedDataList(req); long alreadyReleasedSize = 0; boolean hasFailureOccurred = false; - for (ShufflePartitionedData spd : shufflePartitionedData) { + for (ShufflePartitionedData spd : shufflePartitionedDataList) { String shuffleDataInfo = "appId[" + appId @@ -301,7 +302,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData responseMessage = errorMsg; hasFailureOccurred = true; } else { - long toReleasedSize = spd.getTotalBlockSize(); + long toReleasedSize = spd.getTotalBlockEncodedLength(); // after each cacheShuffleData call, the `preAllocatedSize` is updated timely. shuffleTaskManager.releasePreAllocatedSize(toReleasedSize); alreadyReleasedSize += toReleasedSize; @@ -333,7 +334,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData // Once the cache failure occurs, we should explicitly release data held by byteBuf if (hasFailureOccurred) { Arrays.stream(spd.getBlockList()).forEach(block -> block.getData().release()); - shuffleBufferManager.releaseMemory(spd.getTotalBlockSize(), false, false); + shuffleBufferManager.releaseMemory(spd.getTotalBlockEncodedLength(), false, false); } } } @@ -359,7 +360,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData + "], cost " + costTime + " ms with " - + shufflePartitionedData.size() + + shufflePartitionedDataList.size() + " blocks and " + requireBlocksSize + " bytes"); @@ -750,21 +751,22 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat } } - private List toPartitionedData(SendShuffleDataRequest req) { + private List toPartitionedDataList(SendShuffleDataRequest req) { List ret = Lists.newArrayList(); for (Map.Entry> entry : req.getPartitionToBlocks().entrySet()) { - ret.add(new ShufflePartitionedData(entry.getKey(), toPartitionedBlock(entry.getValue()))); + ret.add(toPartitionedData(entry.getKey(), entry.getValue())); } return ret; } - private Pair toPartitionedBlock(List blocks) { + private ShufflePartitionedData toPartitionedData(int partitionId, List blocks) { if (blocks == null || blocks.size() == 0) { - return Pair.of(0L, new ShufflePartitionedBlock[] {}); + return new ShufflePartitionedData(partitionId, 0L, 0L, new ShufflePartitionedBlock[] {}); } ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()]; - long size = 0L; + long encodedLength = 0L; + long dataLength = 0L; int i = 0; for (ShuffleBlockInfo block : blocks) { ret[i] = @@ -775,10 +777,11 @@ private Pair toPartitionedBlock(List partitions = Lists.newArrayList(); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index a0c06f541a..2a67c548e3 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -295,14 +295,14 @@ public void partitionDataSizeSummaryTest() throws Exception { // case1 ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); - long size1 = partitionedData0.getTotalBlockSize(); + long size1 = partitionedData0.getTotalBlockEncodedLength(); shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); assertEquals(size1, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()); // case2 partitionedData0 = createPartitionedData(1, 1, 35); - long size2 = partitionedData0.getTotalBlockSize(); + long size2 = partitionedData0.getTotalBlockEncodedLength(); shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); assertEquals(size1 + size2, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()); assertEquals( @@ -1159,7 +1159,7 @@ private void validate( for (ShufflePartitionedBlock block : blocks) { for (BufferSegment bs : bufferSegments) { if (bs.getBlockId() == block.getBlockId()) { - assertEquals(block.getLength(), bs.getLength()); + assertEquals(block.getDataLength(), bs.getLength()); assertEquals(block.getCrc(), bs.getCrc()); matchNum++; break; diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java index f2037b58f8..151232af2c 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java @@ -599,14 +599,14 @@ public void appendRepeatBlockTest() { private byte[] getExpectedData(ShufflePartitionedData... spds) { int size = 0; for (ShufflePartitionedData spd : spds) { - size += spd.getBlockList()[0].getLength(); + size += spd.getBlockList()[0].getDataLength(); } byte[] expectedData = new byte[size]; int offset = 0; for (ShufflePartitionedData spd : spds) { ShufflePartitionedBlock block = spd.getBlockList()[0]; - ByteBufUtils.readBytes(block.getData(), expectedData, offset, block.getLength()); - offset += block.getLength(); + ByteBufUtils.readBytes(block.getData(), expectedData, offset, block.getDataLength()); + offset += block.getDataLength(); } return expectedData; } @@ -623,10 +623,10 @@ private void compareBufferSegment( ShufflePartitionedBlock spb = blocks.get(i); BufferSegment segment = bufferSegments.get(segmentIndex); assertEquals(spb.getBlockId(), segment.getBlockId()); - assertEquals(spb.getLength(), segment.getLength()); + assertEquals(spb.getDataLength(), segment.getLength()); assertEquals(spb.getCrc(), segment.getCrc()); assertEquals(offset, segment.getOffset()); - offset += spb.getLength(); + offset += spb.getDataLength(); segmentIndex++; } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java index eb81c40898..d6339975ad 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java @@ -132,7 +132,7 @@ public void write(List shuffleBlocks) throws Exception new FileBasedShuffleSegment( blockId, startOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), crc, block.getTaskAttemptId()); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java index 4b06e5aa91..a00e33d2fc 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java @@ -109,7 +109,7 @@ public synchronized void write(List shuffleBlocks) thro new FileBasedShuffleSegment( blockId, startOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), crc, block.getTaskAttemptId()); diff --git a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java index ea29a47689..1e936b4042 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java +++ b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java @@ -96,11 +96,11 @@ public static void writeTestData( new FileBasedShuffleSegment( spb.getBlockId(), offset, - spb.getLength(), + spb.getDataLength(), spb.getUncompressLength(), spb.getCrc(), 1); - offset += spb.getLength(); + offset += spb.getDataLength(); segments.add(segment); if (doWrite) { writer.writeData(ByteBufUtils.readBytes(spb.getData())); diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java index d1b663f1f3..1b2b7e93b3 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java @@ -212,7 +212,7 @@ public void writeIndex(List shuffleBlocks) new FileBasedShuffleSegment( blockId, startOffset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), crc, block.getTaskAttemptId()); diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java index 2a55ae4de2..5b2db2daf1 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java @@ -65,11 +65,11 @@ public void testDataInconsistent() throws Exception { new FileBasedShuffleSegment( block.getBlockId(), offset, - block.getLength(), + block.getDataLength(), block.getUncompressLength(), block.getCrc(), block.getTaskAttemptId()); - offset += block.getLength(); + offset += block.getDataLength(); LocalFileHandlerTestBase.writeIndex(byteBuffer, segment); } },