Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Test the native sql engine without ColumnarShuffleManager. #1051

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e82c97a
[NSE-913] Add support for Hadoop 3.3.1 (#966)
Jun 16, 2022
256b637
[NSE-927][NSE-126] BackPort PR#975 and PR#977 to branch-1.4 (#978)
zhixingheyi-tian Jun 20, 2022
11dcf98
Add the strategy to fallback to Vanilla Spark shuffle manager.
lviiii Jul 21, 2022
a43be7f
Merge remote-tracking branch 'origin/branch-1.4' into fallback-shuffle
lviiii Jul 25, 2022
b96bc62
Fix the problem "wrong data".
lviiii Jul 25, 2022
d4ae565
Merge remote-tracking branch 'gazelle/branch-1.4' into fallback-shuffle
lviiii Jul 25, 2022
d371925
Merge branch 'main' into fallback-shuffle
lviiii Jul 26, 2022
268a009
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 27, 2022
7c5686d
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 27, 2022
167931f
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 27, 2022
b7b106c
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 29, 2022
3f3732e
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 30, 2022
446db4e
Test the native sql engine without ColumnarShuffleManager.
lviiii Jul 30, 2022
80b41b3
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 1, 2022
d4914a7
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 1, 2022
0145a6f
Merge branch 'main' into config-test
lviiii Aug 2, 2022
e6d1748
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 2, 2022
658f35e
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 2, 2022
bc440a5
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 2, 2022
499bb63
Test the native sql engine without ColumnarShuffleManager.
lviiii Aug 3, 2022
38bd179
benchmark test.
lviiii Aug 5, 2022
bdc7c1f
test
lviiii Aug 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.vectorized;


import scala.collection.convert.Wrappers;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;

public class IteratorWrapper {

private Iterator<List<Long>> in;

public IteratorWrapper(Iterator<List<Long>> in) {
this.in = in;
}

public boolean hasNext() {
return in.hasNext();
}

public List<Long> next() {
return in.next();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,28 @@ public native long nativeMake(
long memoryPoolId,
boolean writeSchema);

public long make(
NativePartitioning part,
long offheapPerTask,
int bufferSize) {
return initSplit(
part.getShortName(),
part.getNumPartitions(),
part.getSchema(),
part.getExprList(),
offheapPerTask,
bufferSize
);
}

public native long initSplit(
String shortName,
int numPartitions,
byte[] schema,
byte[] exprList,
long offheapPerTask,
int bufferSize);

/**
*
* Spill partition data to disk.
Expand Down Expand Up @@ -113,6 +135,11 @@ public native long split(
long splitterId, int numRows, long[] bufAddrs, long[] bufSizes, boolean firstRecordBatch)
throws IOException;

/**
* Collect the record batch after splitting.
*/
public native void collect(long splitterId, int numRows) throws IOException;

/**
* Update the compress type.
*/
Expand All @@ -127,6 +154,16 @@ public native long split(
*/
public native SplitResult stop(long splitterId) throws IOException;


/**
* Clear the buffer. And stop processing splitting
*
* @param splitterId splitter instance id
* @return SplitResult
*/
public native SplitResult clear(long splitterId) throws IOException;


/**
* Release resources associated with designated splitter instance.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.vectorized;


import com.intel.oap.expression.ConverterUtils;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;

public class SplitIterator implements Iterator<ColumnarBatch>{

public static class IteratorOptions implements Serializable {
private static final long serialVersionUID = -1L;

private int partitionNum;

private String name;

private long offheapPerTask;

private int bufferSize;

private String expr;

public NativePartitioning getNativePartitioning() {
return nativePartitioning;
}

public void setNativePartitioning(NativePartitioning nativePartitioning) {
this.nativePartitioning = nativePartitioning;
}

NativePartitioning nativePartitioning;

public int getPartitionNum() {
return partitionNum;
}

public void setPartitionNum(int partitionNum) {
this.partitionNum = partitionNum;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public long getOffheapPerTask() {
return offheapPerTask;
}

public void setOffheapPerTask(long offheapPerTask) {
this.offheapPerTask = offheapPerTask;
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public String getExpr() {
return expr;
}

public void setExpr(String expr) {
this.expr = expr;
}

}

ShuffleSplitterJniWrapper jniWrapper;

private long nativeSplitter = 0;
private final Iterator<ColumnarBatch> iterator;
private final IteratorOptions options;

private ColumnarBatch cb = null;

public SplitIterator(ShuffleSplitterJniWrapper jniWrapper,
Iterator<ColumnarBatch> iterator, IteratorOptions options) {
this.jniWrapper = jniWrapper;
this.iterator = iterator;
this.options = options;
}

private void nativeCreateInstance() {
ArrowRecordBatch recordBatch = ConverterUtils.createArrowRecordBatch(cb);
try {
// if (nativeSplitter != 0) {
// SplitResult splitResult = jniWrapper.clear(nativeSplitter);
// splitResult = null;
// }
if (nativeSplitter == 0) {
nativeSplitter = jniWrapper.make(
options.getNativePartitioning(),
options.getOffheapPerTask(),
options.getBufferSize());

}
nativeSplitter = jniWrapper.make(
options.getNativePartitioning(),
options.getOffheapPerTask(),
options.getBufferSize());
int len = recordBatch.getBuffers().size();
long[] bufAddrs = new long[len];
long[] bufSizes = new long[len];
int i = 0, j = 0;
for (ArrowBuf buffer: recordBatch.getBuffers()) {
bufAddrs[i++] = buffer.memoryAddress();
}
for (ArrowBuffer buffer: recordBatch.getBuffersLayout()) {
bufSizes[j++] = buffer.getSize();
}
jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, false);
jniWrapper.collect(nativeSplitter, cb.numRows());

} catch (IOException e) {
throw new RuntimeException(e);
} finally {
ConverterUtils.releaseArrowRecordBatch(recordBatch);
cb.close();
}

}

private native boolean nativeHasNext(long instance);

public boolean hasRecordBatch(){
while (iterator.hasNext()) {
cb = iterator.next();
if (cb.numRows() != 0 && cb.numCols() != 0) {
return true;
}
}
if (nativeSplitter != 0) {
try {
jniWrapper.clear(nativeSplitter);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
// jniWrapper.close(nativeSplitter);
}
}
return false;
}

@Override
public boolean hasNext() {
// 1. Init the native splitter
if (nativeSplitter == 0) {
boolean flag = hasRecordBatch();
if (!flag) {
return false;
} else {
nativeCreateInstance();
}
}
// 2. Call native hasNext
if (nativeHasNext(nativeSplitter)) {
return true;
} else {
boolean flag = hasRecordBatch();
if (!flag) {
return false;
} else {
nativeCreateInstance();
}
}
return nativeHasNext(nativeSplitter);
}

private native byte[] nativeNext(long instance);

@Override
public ColumnarBatch next() {
byte[] serializedRecordBatch = nativeNext(nativeSplitter);
ColumnarBatch cb = ConverterUtils.createRecordBatch(serializedRecordBatch,
options.getNativePartitioning().getSchema());
serializedRecordBatch = null;
return cb;
}

private native int nativeNextPartitionId(long nativeSplitter);

public int nextPartitionId() {
return nativeNextPartitionId(nativeSplitter);
}

@Override
protected void finalize() throws Throwable {
try {
if (nativeSplitter != 0) {
SplitResult splitResult = jniWrapper.clear(nativeSplitter);
splitResult = null;
jniWrapper.close(nativeSplitter);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enableColumnarShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu

// enable or disable fallback shuffle manager
val enableFallbackShuffle: Boolean = conf
.getConfString("spark.oap.sql.columnar.enableFallbackShuffle", "true")
.equals("true") && enableCpu

val enableArrowColumnarToRow: Boolean =
conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu

Expand Down
Loading