From e37f5ac7c790fc40e7e40c752b887c25591a96c9 Mon Sep 17 00:00:00 2001 From: mccheah Date: Wed, 20 Mar 2019 15:14:04 -0700 Subject: [PATCH] [SPARK-25299] Introduce the new shuffle writer API (#5) (#520) Introduces the new Shuffle Writer API. Ported from https://github.com/bloomberg/apache-spark-on-k8s/pull/5. --- .../spark/api/shuffle/ShuffleDataIO.java | 31 ++++++++++++++ .../shuffle/ShuffleExecutorComponents.java | 33 +++++++++++++++ .../api/shuffle/ShuffleMapOutputWriter.java | 37 ++++++++++++++++ .../api/shuffle/ShufflePartitionWriter.java | 42 +++++++++++++++++++ .../api/shuffle/ShuffleWriteSupport.java | 37 ++++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java new file mode 100644 index 0000000000000..4cb40f6dd00b8 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for launching Shuffle related components + * + * @since 3.0.0 + */ +@Experimental +public interface ShuffleDataIO { + ShuffleExecutorComponents executor(); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java new file mode 100644 index 0000000000000..1edf044225ccf --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for building shuffle support for Executors + * + * @since 3.0.0 + */ +@Experimental +public interface ShuffleExecutorComponents { + void intitializeExecutor(String appId, String execId); + + ShuffleWriteSupport writes(); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java new file mode 100644 index 0000000000000..5119e34803a85 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; + +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for creating and managing shuffle partition writers + * + * @since 3.0.0 + */ +@Experimental +public interface ShuffleMapOutputWriter { + ShufflePartitionWriter getNextPartitionWriter() throws IOException; + + void commitAllPartitions() throws IOException; + + void abort(Throwable error) throws IOException; +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java new file mode 100644 index 0000000000000..c043a6b3a4995 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +import org.apache.http.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for giving streams / channels for shuffle writes + * + * @since 3.0.0 + */ +@Experimental +public interface ShufflePartitionWriter { + OutputStream openStream() throws IOException; + + long getLength(); + + default WritableByteChannel openChannel() throws IOException { + return Channels.newChannel(openStream()); + } +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java new file mode 100644 index 0000000000000..5ba5564bb46d0 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; + +import org.apache.http.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for deploying a shuffle map output writer + * + * @since 3.0.0 + */ +@Experimental +public interface ShuffleWriteSupport { + ShuffleMapOutputWriter createMapOutputWriter( + String appId, + int shuffleId, + int mapId, + int numPartitions) throws IOException; +}