Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce parallelMap native function #863

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::functions::collection::tests::map::*;
import meta::pure::test::pct::*;
import meta::pure::functions::collection::tests::map::model::*;

function meta::pure::functions::collection::parallelMap<T,V|m>(value:T[m], func:Function<{T[1]->V[1]}>[1]):V[m]
{
parallelMap($value, $func, 4)
}

function meta::pure::functions::collection::parallelMap<T,V>(value:T[*], func:Function<{T[1]->V[*]}>[1]):V[*]
{
parallelMap($value, $func, 4)
}

function meta::pure::functions::collection::parallelMap<T,V>(value:T[0..1], func:Function<{T[1]->V[0..1]}>[1]):V[0..1]
{
parallelMap($value, $func, 4)
}

native function
<<PCT.function>>
{
doc.doc='Similar to map() but will try to do the operation in parallel. Also, takes as input the user\'s desired degree of parallelism. However, note that that actual degree of parallelism depends on the state of the shared threadpool.'
}
meta::pure::functions::collection::parallelMap<T,V|m>(value:T[m], func:Function<{T[1]->V[1]}>[1], parallelism:Integer[1]):V[m];

native function <<PCT.function>> meta::pure::functions::collection::parallelMap<T,V>(value:T[*], func:Function<{T[1]->V[*]}>[1], parallelism:Integer[1]):V[*];

native function <<PCT.function>> meta::pure::functions::collection::parallelMap<T,V>(value:T[0..1], func:Function<{T[1]->V[0..1]}>[1], parallelism:Integer[1]):V[0..1];

function <<PCT.test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapWithEmptyCollection<Z|y>(f:Function<{Function<{->Z[y]}>[1]->Z[y]}>[1]):Boolean[1]
{
assertEquals([], $f->eval(|^M_Person(firstName='a', lastName='a').locations->parallelMap(x|$x.place, 2)));
}

function <<PCT.test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapOneToOne<Z|y>(f:Function<{Function<{->Z[y]}>[1]->Z[y]}>[1]):Boolean[1]
{
assertEquals('a', $f->eval(|^M_Person(firstName='a', lastName='a')->parallelMap(x|$x.firstName, 2)));
}

function <<PCT.test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapOneToMany<Z|y>(f:Function<{Function<{->Z[y]}>[1]->Z[y]}>[1]):Boolean[1]
{
let person = ^M_Person(firstName='b', lastName='b', locations=[^M_Location(place='Bengaluru',type=M_GeographicEntityType.CITY), ^M_Location(place='New York',type=M_GeographicEntityType.CITY)]);
assertEquals(['Bengaluru', 'New York'], $f->eval(|$person->parallelMap(x|$x.locations.place, 2)));
}

function <<PCT.test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapManyToOne<Z|y>(f:Function<{Function<{->Z[y]}>[1]->Z[y]}>[1]):Boolean[1]
{
let people = [
^M_Person(firstName='a', lastName='a', locations=[^M_Location(place='London',type=M_GeographicEntityType.CITY)]),
^M_Person(firstName='b', lastName='b', locations=[^M_Location(place='Bengaluru',type=M_GeographicEntityType.CITY), ^M_Location(place='New York',type=M_GeographicEntityType.CITY)]),
^M_Person(firstName='c', lastName='c', locations=[])
];
assertEquals(['a', 'b', 'c'], $f->eval(|$people->parallelMap(x|$x.firstName, 2)));
}

function <<PCT.test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapManyToMany<Z|y>(f:Function<{Function<{->Z[y]}>[1]->Z[y]}>[1]):Boolean[1]
{
let people = [
^M_Person(firstName='a', lastName='a', locations=[^M_Location(place='London',type=M_GeographicEntityType.CITY)]),
^M_Person(firstName='b', lastName='b', locations=[^M_Location(place='Bengaluru',type=M_GeographicEntityType.CITY), ^M_Location(place='New York',type=M_GeographicEntityType.CITY)]),
^M_Person(firstName='c', lastName='c', locations=[])
];
assertEquals(['London', 'Bengaluru', 'New York'], $f->eval(|$people->parallelMap(x|$x.locations.place, 2)));
}

function <<test.Test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapManyToOneWithEvalOnFunctionPointer():Boolean[1]
{
let result = parallelMap_T_m__Function_1__Integer_1__V_m_->eval([1, 2, 3], x:Integer[1]|$x+1, 2);
assertEquals([2, 3, 4], $result);
}

function <<test.Test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapManyToManyWithEvalOnFunctionPointer():Boolean[1]
{
let result = parallelMap_T_MANY__Function_1__Integer_1__V_MANY_->eval([1, 2, 3], {x:Integer[1]|[$x, $x]}, 2);
assertEquals([1, 1, 2, 2, 3, 3], $result);
}

// Tests copied from map()
function <<test.Test>> {test.excludePlatform = 'Java compiled'} meta::pure::functions::collection::tests::parallelMap::testParallelMapWithPropertyAsVariable():Boolean[1]
{
let vals = [^M_Person(firstName='a', lastName='a'), ^M_Person(firstName='b', lastName='b'), ^M_Person(firstName='c', lastName='c')];
let property = M_Person.properties->filter(p | $p.name == 'lastName')->toOne();
assertEquals(['a','b','c'], $vals->parallelMap($property, 2));
}

function <<test.Test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapWithFunctionAsVariable():Boolean[1]
{
let vals = [^M_Person(firstName='a', lastName='a'), ^M_Person(firstName='b', lastName='b'), ^M_Person(firstName='c', lastName='c')];
let lambda = {p:M_Person[1]|$p.lastName};
assertEquals(['a','b','c'], $vals->parallelMap($lambda, 2));
}

function <<test.Test>> meta::pure::functions::collection::tests::parallelMap::testParallelMapWithDynamicFunctionFromZeroOneToZeroOne():Boolean[1]
{
let fn = {|^OptionalString(value='1')->parallelMap(x|$x.value, 2)};
let lambda = ^LambdaFunction<{->String[0..1]}>(expressionSequence = $fn.expressionSequence);
assertEquals('1', $lambda->evaluate([]));
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testGetUserFiles()
Verify.assertSetsEqual(
Sets.mutable.with("/test/codestorage/fake.pure", "/test/org/finos/legend/pure/m3/serialization/filesystem/test/level1/level1.pure", "/test/org/finos/legend/pure/m3/serialization/filesystem/test/level1/level2/level2.pure"),
this.testCodeStorage.getUserFiles().toSet());
Verify.assertEquals(230, this.combinedCodeStorage.getUserFiles().toSet().size());
Verify.assertEquals(231, this.combinedCodeStorage.getUserFiles().toSet().size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.pure.m3.tests.function.base.collection;

import org.finos.legend.pure.m3.tests.AbstractPureTestWithCoreCompiled;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public abstract class AbstractTestParallelMap extends AbstractPureTestWithCoreCompiled
{
@After
public void cleanRuntime()
{
runtime.delete("fromString.pure");
runtime.delete("classes.pure");
runtime.compile();
}

@Test
public void testParallelMapWithMultiplicityInferencePropertyOwner()
{
compileTestSource(
"fromString.pure",
"Class Employee<|m>\n" +
"{\n" +
" prop:String[m];\n" +
"}\n" +
"\n" +
"function test():Nil[0]\n" +
"{\n" +
" let f = [^Employee<|*>(prop=['a','b']), ^Employee<|1>(prop='b')];\n" +
" print($f->parallelMap(e|$e.prop, 3), 1);\n" +
"}\n");
execute("test():Nil[0]");
Assert.assertEquals(
"[\n" +
" 'a'\n" +
" 'b'\n" +
" 'b'\n" +
"]",
functionExecution.getConsole().getLine(0));
}

@Test
public void testParallelMapWithMultiplicityInferenceFunctionWhichIsNotAProperty()
{
compileTestSource(
"fromString.pure",
"function f<|m>(s:String[m]):String[m]\n" +
"{\n" +
" $s\n" +
"}\n" +
"\n" +
"function test():Nil[0]\n" +
"{\n" +
" print([^List<String>(values='a'), ^List<String>(values=['b','c']), ^List<String>(values='c')]->parallelMap(i|f($i.values), 2), 1);\n" +
"}\n");
execute("test():Nil[0]");
Assert.assertEquals(
"[\n" +
" 'a'\n" +
" 'b'\n" +
" 'c'\n" +
" 'c'\n" +
"]",
functionExecution.getConsole().getLine(0));
}

@Test
public void testParallelMapWithVariableThisAsParameter()
{
compileTestSource(
"fromString.pure",
"Class A\n" +
"{\n" +
" func(valueFunc:Function<{A[1]->Float[1]}>[1])\n" +
" {\n" +
" if(true, |$this->parallelMap($valueFunc, 4), |1.0);\n" +
" }:Float[1];\n" +
"}\n" +
"\n" +
"function test():Nil[0]\n" +
"{\n" +
" print(^A().func(a | 2.0), 1);\n" +
"}\n");
execute("test():Nil[0]");
Assert.assertEquals("2.0", functionExecution.getConsole().getLine(0));
}

@Test
public void testParallelMapWithEvalOnFunctionPointer()
{
compileTestSource(
"fromString.pure",
"function test():Nil[0]\n" +
"{\n" +
" print(parallelMap_T_m__Function_1__Integer_1__V_m_->eval([1, 2, 3], x:Integer[1]|$x+1, 2), 1)" +
"}\n");
execute("test():Nil[0]");
Assert.assertEquals(
"[\n" +
" 2\n" +
" 3\n" +
" 4\n" +
"]",
functionExecution.getConsole().getLine(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.finos.legend.pure.runtime.java.shared.listeners.ExecutionListeners;
import org.finos.legend.pure.runtime.java.shared.listeners.IdentifiableExecutionEndListener;

import java.util.concurrent.ExecutorService;

public class CompiledExecutionSupport implements ExecutionSupport
{
private final JavaCompilerState javaCompilerState;
Expand Down Expand Up @@ -68,8 +70,15 @@ public class CompiledExecutionSupport implements ExecutionSupport
private final MutableSet<String> extraSupportedTypes;

private final MutableList<CompiledExtension> compiledExtensions;

private final ExecutorService executorService;

public CompiledExecutionSupport(JavaCompilerState javaCompilerState, CompiledProcessorSupport processorSupport, SourceRegistry sourceRegistry, RepositoryCodeStorage codeStorage, IncrementalCompiler incrementalCompiler, ExecutionActivityListener executionActivityListener, ConsoleCompiled console, FunctionCache functionCache, ClassCache classCache, MetadataProvider metadataProvider, MutableSet<String> extraSupportedTypes, MutableList<CompiledExtension> compiledExtensions, RuntimeOptions options)
{
this(javaCompilerState, processorSupport, sourceRegistry, codeStorage, incrementalCompiler, executionActivityListener, console, functionCache, classCache, metadataProvider, extraSupportedTypes, compiledExtensions, null, null);
}

public CompiledExecutionSupport(JavaCompilerState javaCompilerState, CompiledProcessorSupport processorSupport, SourceRegistry sourceRegistry, RepositoryCodeStorage codeStorage, IncrementalCompiler incrementalCompiler, ExecutionActivityListener executionActivityListener, ConsoleCompiled console, FunctionCache functionCache, ClassCache classCache, MetadataProvider metadataProvider, MutableSet<String> extraSupportedTypes, MutableList<CompiledExtension> compiledExtensions, RuntimeOptions options, ExecutorService executorService)
{
this.javaCompilerState = javaCompilerState;
this.sourceRegistry = sourceRegistry;
Expand All @@ -85,6 +94,7 @@ public CompiledExecutionSupport(JavaCompilerState javaCompilerState, CompiledPro
this.extraSupportedTypes = extraSupportedTypes;
this.options = (options == null) ? name -> false : options;
this.compiledExtensions = compiledExtensions;
this.executorService = executorService;
}

public CompiledExecutionSupport(JavaCompilerState javaCompilerState, CompiledProcessorSupport processorSupport, SourceRegistry sourceRegistry, RepositoryCodeStorage codeStorage, IncrementalCompiler incrementalCompiler, ExecutionActivityListener executionActivityListener, ConsoleCompiled console, FunctionCache functionCache, ClassCache classCache, MetadataProvider metadataProvider, MutableSet<String> extraSupportedTypes, MutableList<CompiledExtension> compiledExtensions)
Expand Down Expand Up @@ -236,4 +246,9 @@ public MutableList<CompiledExtension> getCompiledExtensions()
{
return compiledExtensions;
}

public ExecutorService getExecutorService()
{
return this.executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.concurrent.ExecutorService;

@ExecutionPlatform(name = "Java compiled")
public class FunctionExecutionCompiled implements FunctionExecution, PureRuntimeEventHandler
Expand All @@ -98,15 +99,23 @@ public class FunctionExecutionCompiled implements FunctionExecution, PureRuntime
private PureRuntime runtime;

private final boolean includePureStackTrace;

private final ExecutorService executorService;

private Metadata providedMetadata = null;

private FunctionExecutionCompiled(ExecutionActivityListener executionActivityListener, JavaCompilerEventObserver javaCompilerEventObserver, boolean includePureStackTrace, MutableList<CompiledExtension> extensions)
{
this(executionActivityListener, javaCompilerEventObserver, includePureStackTrace, extensions, null);
}

private FunctionExecutionCompiled(ExecutionActivityListener executionActivityListener, JavaCompilerEventObserver javaCompilerEventObserver, boolean includePureStackTrace, MutableList<CompiledExtension> extensions, ExecutorService executorService)
{
this.executionActivityListener = executionActivityListener;
this.javaCompilerEventObserver = (javaCompilerEventObserver == null) ? VoidJavaCompilerEventObserver.VOID_JAVA_COMPILER_EVENT_OBSERVER : javaCompilerEventObserver;
this.includePureStackTrace = includePureStackTrace;
this.extensions = extensions;
this.executorService = executorService;
}

@Override
Expand Down Expand Up @@ -174,7 +183,8 @@ public CompiledExecutionSupport getExecutionSupport()
this.metadataCompilerEventHandler,
this.extraSupportedTypes,
this.extensions,
this.runtime.getOptions()
this.runtime.getOptions(),
executorService
);
}

Expand Down Expand Up @@ -538,8 +548,8 @@ private MetadataEventObserver getMetadataEventObserver()
return (this.javaCompilerEventHandler instanceof MetadataEventObserver) ? (MetadataEventObserver) this.javaCompilerEventObserver : VoidJavaCompilerEventObserver.VOID_JAVA_COMPILER_EVENT_OBSERVER;
}

static FunctionExecutionCompiled createFunctionExecutionCompiled(ExecutionActivityListener executionActivityListener, boolean includePureStackTrace, JavaCompilerEventObserver javaCompilerEventObserver)
static FunctionExecutionCompiled createFunctionExecutionCompiled(ExecutionActivityListener executionActivityListener, boolean includePureStackTrace, JavaCompilerEventObserver javaCompilerEventObserver, ExecutorService executorService)
{
return new FunctionExecutionCompiled(executionActivityListener, javaCompilerEventObserver, includePureStackTrace, CompiledExtensionLoader.extensions());
return new FunctionExecutionCompiled(executionActivityListener, javaCompilerEventObserver, includePureStackTrace, CompiledExtensionLoader.extensions(), executorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.finos.legend.pure.runtime.java.compiled.statelistener.JavaCompilerEventObserver;
import org.finos.legend.pure.runtime.java.compiled.statelistener.VoidJavaCompilerEventObserver;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* Configure the compiled execution environment
*/
Expand All @@ -28,6 +31,7 @@ public class FunctionExecutionCompiledBuilder

private ExecutionActivityListener executionActivityListener = VoidExecutionActivityListener.VOID_EXECUTION_ACTIVITY_LISTENER;
private JavaCompilerEventObserver javaCompilerEventObserver = VoidJavaCompilerEventObserver.VOID_JAVA_COMPILER_EVENT_OBSERVER;
private ExecutorService executorService = null;

public FunctionExecutionCompiledBuilder shouldIncludePureStackTrace()
{
Expand All @@ -53,8 +57,14 @@ public FunctionExecutionCompiledBuilder withJavaCompilerEventObserver(JavaCompil
return this;
}

public FunctionExecutionCompiledBuilder withExecutorService(ExecutorService executorService)
{
this.executorService = executorService;
return this;
}

public FunctionExecutionCompiled build()
{
return FunctionExecutionCompiled.createFunctionExecutionCompiled(this.executionActivityListener, this.includePureStackTrace, this.javaCompilerEventObserver);
return FunctionExecutionCompiled.createFunctionExecutionCompiled(this.executionActivityListener, this.includePureStackTrace, this.javaCompilerEventObserver, this.executorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar._boolean.inequality.LessThan;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar._boolean.inequality.LessThanEqual;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar.collection.iteration.Filter;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar.collection.iteration.ParallelMap;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar.collection.slice.First;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar.collection.size.IsEmpty;
import org.finos.legend.pure.runtime.java.compiled.generation.processors.natives.grammar.collection.iteration.Map;
Expand Down Expand Up @@ -322,6 +323,7 @@ private static void registerGrammarCoreNatives(MutableMap<String, Native> map)
registerNative(map, new First());
registerNative(map, new IsEmpty());
registerNative(map, new Map());
registerNative(map, new ParallelMap());
registerNative(map, new Range());
registerNative(map, new Size());

Expand Down
Loading
Loading