From 05d3f69bda6935a59d03fe40ae771113d1bd29b8 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 24 Jun 2022 15:09:02 -0700 Subject: [PATCH 01/35] [DNM] core: preliminary implementation of image partitions --- legate/core/_legion/partition_functor.py | 8 +- legate/core/constraints.py | 24 ++++- legate/core/operation.py | 23 ++++- legate/core/partition.py | 112 ++++++++++++++++++++++- legate/core/runtime.py | 11 ++- legate/core/solver.py | 15 ++- legate/core/store.py | 4 +- 7 files changed, 171 insertions(+), 26 deletions(-) diff --git a/legate/core/_legion/partition_functor.py b/legate/core/_legion/partition_functor.py index 469a12c2c..3383922c6 100644 --- a/legate/core/_legion/partition_functor.py +++ b/legate/core/_legion/partition_functor.py @@ -20,8 +20,10 @@ from .future import FutureMap from .geometry import Point +from . import FieldID + if TYPE_CHECKING: - from . import FieldID, IndexPartition, IndexSpace, Rect, Region, Transform + from . import FieldID, IndexPartition, IndexSpace, Partition, Rect, Region, Transform class PartitionFunctor: @@ -103,7 +105,7 @@ class PartitionByImage(PartitionFunctor): def __init__( self, region: Region, - part: IndexPartition, + part: Partition, field: Union[int, FieldID], mapper: int = 0, tag: int = 0, @@ -147,7 +149,7 @@ class PartitionByImageRange(PartitionFunctor): def __init__( self, region: Region, - part: IndexPartition, + part: Partition, field: Union[int, FieldID], mapper: int = 0, tag: int = 0, diff --git a/legate/core/constraints.py b/legate/core/constraints.py index b8fdc6c49..123a3c3cb 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -17,7 +17,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any, Iterator, Optional, Protocol, Union -from .partition import Restriction +from .partition import ImagePartition, Restriction, Replicate if TYPE_CHECKING: from .partition import PartitionBase @@ -217,6 +217,28 @@ def unknowns(self) -> Iterator[PartSym]: yield unknown +class Image(Expr): + def __init__(self, source_store : Store, dst_store : Store, src_part_sym : Expr, range: bool = False): + self._source_store = source_store + self._dst_store = dst_store + self._src_part_sym = src_part_sym + self._range = range + + def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: + return Image(self._source_store, self._dst_store, self._src_part_sym.subst(mapping), range=self._range) + + def reduce(self) -> Lit: + expr = self._src_part_sym.reduce() + assert isinstance(expr, Lit) + part = expr._part + if isinstance(part, Replicate): + return Lit(part) + return Lit(ImagePartition(part.runtime, self._source_store, part, range=self._range)) + + def unknowns(self) -> Iterator[PartSym]: + for unknown in self._src_part_sym.unknowns(): + yield unknown + class Constraint: pass diff --git a/legate/core/operation.py b/legate/core/operation.py index e27df60af..026e64dd7 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -19,9 +19,9 @@ import legate.core.types as ty from . import Future, FutureMap, Rect -from .constraints import PartSym +from .constraints import Image, PartSym from .launcher import CopyLauncher, TaskLauncher -from .partition import REPLICATE, Weighted +from .partition import Replicate, Weighted from .shape import Shape from .store import Store, StorePartition from .utils import OrderedSet, capture_traceback @@ -160,6 +160,19 @@ def add_broadcast( part = self._get_unique_partition(store) self.add_constraint(part.broadcast(axes=axes)) + # add_image_constraint adds a constraint that the image of store1 is + # contained within the partition of store2. + def add_image_constraint(self, store1: Store, store2: Store, range : bool = False): + self._check_store(store1) + self._check_store(store2) + # TODO (rohany): We only support point (and rect types if range) here. It seems + # like rects should be added to legate.core's type system rather than an external + # type system to understand this then. + part1 = self._get_unique_partition(store1) + part2 = self._get_unique_partition(store2) + image = Image(store1, store2, part1, range=range) + self.add_constraint(image <= part2) + def add_constraint(self, constraint: Constraint) -> None: self._constraints.append(constraint) @@ -591,7 +604,7 @@ def add_input( ) -> None: self._check_arg(arg) if isinstance(arg, Store): - self._input_parts.append(arg.partition(REPLICATE)) + self._input_parts.append(arg.partition(Replicate(self.context.runtime))) else: self._input_parts.append(arg) self._input_projs.append(proj) @@ -610,7 +623,7 @@ def add_output( ) if arg.kind is Future: self._scalar_outputs.append(len(self._outputs)) - self._output_parts.append(arg.partition(REPLICATE)) + self._output_parts.append(arg.partition(Replicate(self.context.runtime))) else: self._output_parts.append(arg) self._output_projs.append(proj) @@ -625,7 +638,7 @@ def add_reduction( if isinstance(arg, Store): if arg.kind is Future: self._scalar_reductions.append(len(self._reductions)) - self._reduction_parts.append((arg.partition(REPLICATE), redop)) + self._reduction_parts.append((arg.partition(Replicate(self.context.runtime)), redop)) else: self._reduction_parts.append((arg, redop)) self._reduction_projs.append(proj) diff --git a/legate/core/partition.py b/legate/core/partition.py index ba13b5351..45fbdaab3 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -19,6 +19,8 @@ from . import ( IndexPartition, + PartitionByImage, + PartitionByImageRange, PartitionByRestriction, PartitionByWeights, Rect, @@ -74,8 +76,19 @@ def needs_delinearization(self, launch_ndim: int) -> bool: def requirement(self) -> RequirementType: ... + @abstractproperty + def runtime(self) -> Runtime: + ... + class Replicate(PartitionBase): + def __init__(self, runtime: Runtime): + self._runtime = runtime + + @property + def runtime(self): + return self._runtime + @property def color_shape(self) -> Optional[Shape]: return None @@ -129,9 +142,6 @@ def construct( return None -REPLICATE = Replicate() - - class Interval: def __init__(self, lo: int, extent: int) -> None: self._lo = lo @@ -254,7 +264,7 @@ def translate(self, offset: Shape) -> Tiling: self._offset + offset, ) - # This function promotes the translated partition to REPLICATE if it + # This function promotes the translated partition to Replicate if it # doesn't overlap with the original partition. def translate_range(self, offset: Shape) -> Union[Replicate, Tiling]: promote = False @@ -269,7 +279,7 @@ def translate_range(self, offset: Shape) -> Union[Replicate, Tiling]: # TODO: We can actually bloat the tile so that all stencils within # the range are contained, but here we simply replicate # the region, as this usually happens for small inputs. - return REPLICATE + return Replicate(self.runtime) else: return Tiling( self._tile_shape, @@ -420,3 +430,95 @@ def construct( ) runtime.record_partition(index_space, self, index_partition) return region.get_child(index_partition) + + +# TODO (rohany): Do we need to have a difference between image and preimage? +class ImagePartition(PartitionBase): + # TODO (rohany): What's the right type to pass through for the partitions and regions here? + # store is of type legate.Store. However, we can't import it directly due to an import cycle. + def __init__(self, runtime: Runtime, store: Any, part: PartitionBase, range : bool = False) -> None: + self._runtime = runtime + self._store = store + self._part = part + # Whether this is an image or image_range operation. + self._range = range + + @property + def color_shape(self) -> Optional[Shape]: + return self._part.color_shape + + @property + def even(self) -> bool: + ... + + def construct( + self, region: Region, complete: bool = False + ) -> Optional[LegionPartition]: + # TODO (rohany): We can't import RegionField due to an import cycle. + # assert(isinstance(self._store.storage, RegionField)) + source_region = self._store.storage.region + source_field = self._store.storage.field + + # TODO (rohany): What should the value of complete be? + source_part = self._part.construct(source_region) + if self._range: + functor = PartitionByImageRange(source_region, source_part, source_field.field_id) + else: + functor = PartitionByImage(source_region, source_part, source_field.field_id) + # TODO (rohany): Use some information about the partition to figure out whats going on... + # Maybe there should be hints that the user can pass in through the constraints. + kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + # TODO (rohany): Let's just create a new partition each time. + index_partition = IndexPartition( + self._runtime.legion_context, + self._runtime.legion_runtime, + region.index_space, + source_part.color_space, + functor=functor, + kind=kind, + keep=True, + ) + self._runtime.record_partition(region.index_space, self, index_partition) + return region.get_child(index_partition) + + # TODO (rohany): IDK how we're supposed to know this about an image / image range. + def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: + return False + + # TODO (rohany): IDK how we're supposed to know this about an image / image range. + def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: + return False + + # TODO (rohany): IDK how we're supposed to know this about an image / image range. + def satisfies_restriction( + self, restrictions: Sequence[Restriction] + ) -> bool: + raise NotImplementedError + + # TODO (rohany): IDK what this means... + def needs_delinearization(self, launch_ndim: int) -> bool: + return False + + @property + def requirement(self) -> RequirementType: + return Partition + + @property + def runtime(self) -> Runtime: + return self._runtime + + # TODO (rohany): Implement... + def __hash__(self) -> int: + # TODO (rohany): A problem with this (and then using this as a key in the future) is that + # the result of the image partition depends on the values in the region. Once the region + # has been updated, the partition is different. + return hash(self.__class__) + + def __eq__(self, other: object) -> bool: + raise NotImplementedError + + def __str__(self) -> str: + return f"image({self._store}, {self._part}, range={self._range})" + + def __repr__(self) -> str: + return str(self) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 1c253d80d..4caf3b1b7 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -485,10 +485,11 @@ def prune_detachments(self) -> None: class PartitionManager: def __init__(self, runtime: Runtime) -> None: self._runtime = runtime - self._num_pieces = runtime.core_context.get_tunable( - runtime.core_library.LEGATE_CORE_TUNABLE_NUM_PIECES, - ty.int32, - ) + # self._num_pieces = runtime.core_context.get_tunable( + # runtime.core_library.LEGATE_CORE_TUNABLE_NUM_PIECES, + # ty.int32, + # ) + self._num_pieces = 4 self._min_shard_volume = runtime.core_context.get_tunable( runtime.core_library.LEGATE_CORE_TUNABLE_MIN_SHARD_VOLUME, ty.int64, @@ -1027,7 +1028,7 @@ def _schedule(self, ops: List[Operation]) -> None: must_be_single = len(op.scalar_outputs) > 0 partitioner = Partitioner([op], must_be_single=must_be_single) strategies.append(partitioner.partition_stores()) - + print(strategies[-1]) for op, strategy in zip(ops, strategies): op.launch(strategy) diff --git a/legate/core/solver.py b/legate/core/solver.py index 592341054..f2bc3d793 100644 --- a/legate/core/solver.py +++ b/legate/core/solver.py @@ -213,11 +213,11 @@ def _solve_constraints_for_futures( continue if store.kind is Future: - partitions[unknown] = REPLICATE + partitions[unknown] = Replicate(self._runtime) else: cls = constraints.find(unknown) for to_align in cls: - partitions[to_align] = REPLICATE + partitions[to_align] = Replicate(self._runtime) return unknowns.remove_all(to_remove) @@ -244,7 +244,7 @@ def _solve_unbound_constraints( fspace = runtime.create_field_space() for to_align in cls: - partitions[unknown] = REPLICATE + partitions[unknown] = Replicate(self._runtime) fspaces[unknown] = fspace unbound_ndims = set(unknown.store.ndim for unknown in to_remove) @@ -342,12 +342,12 @@ def compute_launch_shape( # to replication, in which case the operation must be performed # sequentially for unknown, part in partitions.items(): - if unknown.store in all_outputs and part is REPLICATE: + if unknown.store in all_outputs and isinstance(part, Replicate): return None # If we're here, this means that replicated stores are safe to access # in parallel, so we filter those out to determine the launch domain - parts = [part for part in partitions.values() if part is not REPLICATE] + parts = [part for part in partitions.values() if not isinstance(part, Replicate)] # If all stores are replicated, we can't parallelize the operation if len(parts) == 0: @@ -426,11 +426,16 @@ def partition_stores(self) -> Strategy: for unknown in c._lhs.unknowns(): must_be_even.add(unknown) dependent[c._rhs] = c._lhs + else: + raise NotImplementedError for op in self._ops: all_outputs.update( store for store in op.outputs if not store.unbound ) + print(op.constraints) + print(constraints, dependent) + if self._must_be_single or len(unknowns) == 0: for unknown in unknowns: c = unknown.broadcast() diff --git a/legate/core/store.py b/legate/core/store.py index dbdc5b216..6eb58b0f7 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -1241,7 +1241,7 @@ def compute_key_partition( # If this is effectively a scalar store, we don't need to partition it if self.kind is Future or self.ndim == 0: - return REPLICATE + return Replicate(self._runtime) # We need the transformations to be convertible so that we can map # the storage partition to this store's coordinate space @@ -1261,7 +1261,7 @@ def compute_key_partition( restrictions, ) if launch_shape is None: - partition = REPLICATE + partition = Replicate(self._runtime) else: tile_shape = partition_manager.compute_tile_shape( self.shape, launch_shape From b6fbe6e8ef8c47402e23265f6e4edbd9782df7fd Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 29 Jun 2022 22:04:52 -0700 Subject: [PATCH 02/35] more updates to solver, allow for partitions dependent on image constraints --- legate/core/constraints.py | 34 +++++++++++-- legate/core/partition.py | 42 ++++++++++++---- legate/core/runtime.py | 1 - legate/core/solver.py | 98 +++++++++++++++++++++++++++++--------- 4 files changed, 139 insertions(+), 36 deletions(-) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index 123a3c3cb..c9594cc15 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -17,7 +17,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any, Iterator, Optional, Protocol, Union -from .partition import ImagePartition, Restriction, Replicate +from .partition import ImagePartition, Replicate, Restriction if TYPE_CHECKING: from .partition import PartitionBase @@ -218,14 +218,25 @@ def unknowns(self) -> Iterator[PartSym]: class Image(Expr): - def __init__(self, source_store : Store, dst_store : Store, src_part_sym : Expr, range: bool = False): + def __init__( + self, + source_store: Store, + dst_store: Store, + src_part_sym: Expr, + range: bool = False, + ): self._source_store = source_store self._dst_store = dst_store self._src_part_sym = src_part_sym self._range = range def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: - return Image(self._source_store, self._dst_store, self._src_part_sym.subst(mapping), range=self._range) + return Image( + self._source_store, + self._dst_store, + self._src_part_sym.subst(mapping), + range=self._range, + ) def reduce(self) -> Lit: expr = self._src_part_sym.reduce() @@ -233,12 +244,27 @@ def reduce(self) -> Lit: part = expr._part if isinstance(part, Replicate): return Lit(part) - return Lit(ImagePartition(part.runtime, self._source_store, part, range=self._range)) + return Lit( + ImagePartition( + part.runtime, self._source_store, part, range=self._range + ) + ) def unknowns(self) -> Iterator[PartSym]: for unknown in self._src_part_sym.unknowns(): yield unknown + def equals(self, other: object): + # TODO (rohany): Careful... overloaded equals operator... + return ( + isinstance(other, Image) + and self._source_store == other._source_store + and self._dst_store == other._dst_store + and self._src_part_sym is other._src_part_sym + and self._range == other._range + ) + + class Constraint: pass diff --git a/legate/core/partition.py b/legate/core/partition.py index 45fbdaab3..a513091a5 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -15,7 +15,7 @@ from __future__ import annotations from abc import ABC, abstractmethod, abstractproperty -from typing import TYPE_CHECKING, Optional, Sequence, Type, Union +from typing import TYPE_CHECKING, Any, Optional, Sequence, Type, Union from . import ( IndexPartition, @@ -436,7 +436,13 @@ def construct( class ImagePartition(PartitionBase): # TODO (rohany): What's the right type to pass through for the partitions and regions here? # store is of type legate.Store. However, we can't import it directly due to an import cycle. - def __init__(self, runtime: Runtime, store: Any, part: PartitionBase, range : bool = False) -> None: + def __init__( + self, + runtime: Runtime, + store: Any, + part: PartitionBase, + range: bool = False, + ) -> None: self._runtime = runtime self._store = store self._part = part @@ -449,10 +455,10 @@ def color_shape(self) -> Optional[Shape]: @property def even(self) -> bool: - ... + return False def construct( - self, region: Region, complete: bool = False + self, region: Region, complete: bool = False ) -> Optional[LegionPartition]: # TODO (rohany): We can't import RegionField due to an import cycle. # assert(isinstance(self._store.storage, RegionField)) @@ -462,9 +468,13 @@ def construct( # TODO (rohany): What should the value of complete be? source_part = self._part.construct(source_region) if self._range: - functor = PartitionByImageRange(source_region, source_part, source_field.field_id) + functor = PartitionByImageRange( + source_region, source_part, source_field.field_id + ) else: - functor = PartitionByImage(source_region, source_part, source_field.field_id) + functor = PartitionByImage( + source_region, source_part, source_field.field_id + ) # TODO (rohany): Use some information about the partition to figure out whats going on... # Maybe there should be hints that the user can pass in through the constraints. kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND @@ -478,7 +488,7 @@ def construct( kind=kind, keep=True, ) - self._runtime.record_partition(region.index_space, self, index_partition) + # self._runtime.record_partition(region.index_space, self, index_partition) return region.get_child(index_partition) # TODO (rohany): IDK how we're supposed to know this about an image / image range. @@ -491,7 +501,7 @@ def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: # TODO (rohany): IDK how we're supposed to know this about an image / image range. def satisfies_restriction( - self, restrictions: Sequence[Restriction] + self, restrictions: Sequence[Restriction] ) -> bool: raise NotImplementedError @@ -509,13 +519,27 @@ def runtime(self) -> Runtime: # TODO (rohany): Implement... def __hash__(self) -> int: + return hash( + ( + self.__class__, + self._store, + self._part, + self._range, + ) + ) # TODO (rohany): A problem with this (and then using this as a key in the future) is that # the result of the image partition depends on the values in the region. Once the region # has been updated, the partition is different. return hash(self.__class__) def __eq__(self, other: object) -> bool: - raise NotImplementedError + return False + return ( + isinstance(other, PartitionByImage) + and self._store == other._store + and self._part == other._part + and self._range == other._range + ) def __str__(self) -> str: return f"image({self._store}, {self._part}, range={self._range})" diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 4caf3b1b7..6772a0b52 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -1028,7 +1028,6 @@ def _schedule(self, ops: List[Operation]) -> None: must_be_single = len(op.scalar_outputs) > 0 partitioner = Partitioner([op], must_be_single=must_be_single) strategies.append(partitioner.partition_stores()) - print(strategies[-1]) for op, strategy in zip(ops, strategies): op.launch(strategy) diff --git a/legate/core/solver.py b/legate/core/solver.py index f2bc3d793..577f6de17 100644 --- a/legate/core/solver.py +++ b/legate/core/solver.py @@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Generic, List, Optional, TypeVar from . import FieldSpace, Future, Rect -from .constraints import Alignment, Broadcast, Containment, PartSym +from .constraints import Alignment, Broadcast, Containment, Image, PartSym from .partition import REPLICATE from .runtime import runtime from .shape import Shape @@ -347,7 +347,11 @@ def compute_launch_shape( # If we're here, this means that replicated stores are safe to access # in parallel, so we filter those out to determine the launch domain - parts = [part for part in partitions.values() if not isinstance(part, Replicate)] + parts = [ + part + for part in partitions.values() + if not isinstance(part, Replicate) + ] # If all stores are replicated, we can't parallelize the operation if len(parts) == 0: @@ -400,6 +404,7 @@ def partition_stores(self) -> Strategy: for op in self._ops: unknowns.update(op.all_unknowns) for c in op.constraints: + # print(c) if isinstance(c, Alignment): constraints.record(c._lhs, c._rhs) elif isinstance(c, Broadcast): @@ -408,10 +413,14 @@ def partition_stores(self) -> Strategy: c._lhs, PartSym ): if c._lhs in dependent: - raise NotImplementedError( - "Partitions constrained by multiple constraints " - "are not supported yet" - ) + rhs = dependent[c._lhs] + # While we can't have multiple constraints, we are ok with seeing a duplicate + # image constraint, as that doesn't affect the solving. + if not (isinstance(rhs, Image) and rhs.equals(c._rhs)): + raise NotImplementedError( + "Partitions constrained by multiple constraints " + "are not supported yet" + ) for unknown in c._rhs.unknowns(): must_be_even.add(unknown) dependent[c._lhs] = c._rhs @@ -419,10 +428,14 @@ def partition_stores(self) -> Strategy: c._rhs, PartSym ): if c._rhs in dependent: - raise NotImplementedError( - "Partitions constrained by multiple constraints " - "are not supported yet" - ) + lhs = dependent[c._rhs] + # While we can't have multiple constraints, we are ok with seeing a duplicate + # image constraint, as that doesn't affect the solving. + if not (isinstance(lhs, Image) and lhs.equals(c._lhs)): + raise NotImplementedError( + "Partitions constrained by multiple constraints " + "are not supported yet" + ) for unknown in c._lhs.unknowns(): must_be_even.add(unknown) dependent[c._rhs] = c._lhs @@ -433,8 +446,12 @@ def partition_stores(self) -> Strategy: store for store in op.outputs if not store.unbound ) - print(op.constraints) - print(constraints, dependent) + # assert(len(self._ops) == 1) + # print("Op constraints", self._ops[0].constraints) + # # print("Constraints: ", constraints) + # print("Depends", dependent) + + # print("Unknowns: ", list(unknowns)) if self._must_be_single or len(unknowns) == 0: for unknown in unknowns: @@ -461,6 +478,8 @@ def partition_stores(self) -> Strategy: unknowns, broadcasts, constraints ) + # print("Next set unknowns: ", list(unknowns)) + def cost(unknown: PartSym) -> tuple[int, bool]: store = unknown.store return ( @@ -480,17 +499,42 @@ def cost(unknown: PartSym) -> tuple[int, bool]: store = unknown.store restrictions = all_restrictions[unknown] cls = constraints.find(unknown) + # print("Constraints for: ", unknown, list(cls)) + # print("Current partitions: ", partitions) - partition = store.compute_key_partition(restrictions) - if not partition.even and len(cls) > 1: - partition, unknown = self.maybe_find_alternative_key_partition( - partition, - unknown, - cls, - restrictions, - must_be_even, - ) - key_parts.add(unknown) + # If we are supposed to be aligned with a partition in dependents, then + # don't make a decision right now. + depends = False + for to_align in cls: + if to_align in dependent: + depends = True + # print(unknown, "depends on other parts") + if depends: + continue + + # TODO (rohany): If we already have a partition for this equality class + # we need to use it. + for to_align in cls: + if to_align in partitions: + partition = partitions[to_align] + break + else: + partition = store.compute_key_partition(restrictions) + # print("Key partition for store: ", store, partition) + if not partition.even and len(cls) > 1: + ( + partition, + unknown, + ) = self.maybe_find_alternative_key_partition( + partition, + unknown, + cls, + restrictions, + must_be_even, + ) + key_parts.add(unknown) + + # print("computed partition", partition) for to_align in cls: if to_align in partitions: @@ -498,11 +542,21 @@ def cost(unknown: PartSym) -> tuple[int, bool]: partitions[to_align] = partition for rhs, lhs in dependent.items(): + # print(rhs, lhs) expr = lhs.subst(partitions).reduce() if TYPE_CHECKING: assert isinstance(expr, Lit) partitions[rhs] = expr._part + # Comment... + for unknown in sorted_unknowns: + if unknown in partitions: + continue + cls = constraints.find(unknown) + for to_align in cls: + if to_align in partitions: + partitions[unknown] = partitions[to_align] + launch_shape = self.compute_launch_shape( partitions, all_outputs, unbound_ndim ) From b6477212abb7b366b972b1c01e2ad86f0a0601e6 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 30 Jun 2022 14:48:14 -0700 Subject: [PATCH 03/35] core: allow for manual tasks to have unbound outputs --- legate/core/operation.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/legate/core/operation.py b/legate/core/operation.py index 026e64dd7..d788bdd95 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -617,10 +617,9 @@ def add_output( self._check_arg(arg) if isinstance(arg, Store): if arg.unbound: - raise ValueError( - "Unbound store cannot be used with " - "manually parallelized task" - ) + self._unbound_outputs.append(len(self._outputs)) + self._outputs.append(arg) + return if arg.kind is Future: self._scalar_outputs.append(len(self._outputs)) self._output_parts.append(arg.partition(Replicate(self.context.runtime))) @@ -691,6 +690,13 @@ def launch(self, strategy: Strategy) -> None: part.store, req, tag=0, read_write=can_read_write ) + # Add all unbound stores. + for store_idx in self._unbound_outputs: + store = self._outputs[store_idx] + fspace = self.context.runtime.create_field_space() + field_id = fspace.allocate_field(store.type) + launcher.add_unbound_output(store, fspace, field_id) + self._add_scalar_args_to_launcher(launcher) launcher.set_can_raise_exception(self.can_raise_exception) From db7cbbb52c029f2a60c800334a475b03d1d89f76 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Sun, 3 Jul 2022 13:53:29 -0700 Subject: [PATCH 04/35] several updates for SpGEMM: * enable legion attaching projection functors to tasks * add partition by domain creation * enable passing future maps to tasks * patch a case in mapper's instance manager for sparse instances * patch a case in mapper where it assumes number of created tasks is equal to the number of processors. --- legate/core/_legion/future.py | 2 +- legate/core/operation.py | 10 +++- legate/core/partition.py | 94 +++++++++++++++++++++++++++++---- legate/core/store.py | 22 ++++++-- src/core/mapping/core_mapper.cc | 5 +- src/core/runtime/context.h | 5 +- 6 files changed, 117 insertions(+), 21 deletions(-) diff --git a/legate/core/_legion/future.py b/legate/core/_legion/future.py index f4d98c882..bbb6078a8 100644 --- a/legate/core/_legion/future.py +++ b/legate/core/_legion/future.py @@ -343,7 +343,7 @@ def from_list( context, domain, points, - futures, + futures_, num_futures, False, 0, diff --git a/legate/core/operation.py b/legate/core/operation.py index d788bdd95..a385bf66e 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -575,7 +575,8 @@ def __init__( context=context, task_id=task_id, mapper_id=mapper_id, op_id=op_id ) self._launch_domain: Rect = launch_domain - self._input_projs: list[Union[ProjFn, None]] = [] + # TODO (rohany): The int here is an explicit ID. + self._input_projs: list[Union[ProjFn, None, int]] = [] self._output_projs: list[Union[ProjFn, None]] = [] self._reduction_projs: list[Union[ProjFn, None]] = [] @@ -583,6 +584,8 @@ def __init__( self._output_parts: list[StorePartition] = [] self._reduction_parts: list[tuple[StorePartition, int]] = [] + self._scalar_future_maps: list[FutureMap] = [] + @property def launch_ndim(self) -> int: return self._launch_domain.dim @@ -600,7 +603,7 @@ def _check_arg(arg: Union[Store, StorePartition]) -> None: def add_input( self, arg: Union[Store, StorePartition], - proj: Optional[ProjFn] = None, + proj: Optional[ProjFn, int] = None, ) -> None: self._check_arg(arg) if isinstance(arg, Store): @@ -690,6 +693,9 @@ def launch(self, strategy: Strategy) -> None: part.store, req, tag=0, read_write=can_read_write ) + for fm in self._scalar_future_maps: + launcher.add_future_map(fm) + # Add all unbound stores. for store_idx in self._unbound_outputs: store = self._outputs[store_idx] diff --git a/legate/core/partition.py b/legate/core/partition.py index a513091a5..13f264277 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -23,6 +23,8 @@ PartitionByImageRange, PartitionByRestriction, PartitionByWeights, + PartitionByDomain, + Point, Rect, Transform, legion, @@ -527,22 +529,94 @@ def __hash__(self) -> int: self._range, ) ) - # TODO (rohany): A problem with this (and then using this as a key in the future) is that - # the result of the image partition depends on the values in the region. Once the region - # has been updated, the partition is different. - return hash(self.__class__) def __eq__(self, other: object) -> bool: return False - return ( - isinstance(other, PartitionByImage) - and self._store == other._store - and self._part == other._part - and self._range == other._range - ) + # return ( + # isinstance(other, PartitionByImage) + # and self._store == other._store + # and self._part == other._part + # and self._range == other._range + # ) def __str__(self) -> str: return f"image({self._store}, {self._part}, range={self._range})" def __repr__(self) -> str: return str(self) + + +class DomainPartition(PartitionBase): + def __init__(self, runtime: Runtime, color_shape: Shape, domains: Union[FutureMap, dict[Point, Rect]]): + self._runtime = runtime + self._color_shape = color_shape + self._domains = domains + + @property + def color_shape(self) -> Optional[Shape]: + return self._color_shape + + @property + def even(self) -> bool: + return False + + def construct(self, region: Region, complete: bool = False): + # TODO (rohany): Think about caching these things. + functor = PartitionByDomain(self._domains) + index_space = region.index_space + index_partition = IndexPartition( + self._runtime.legion_context, + self._runtime.legion_runtime, + index_space, + self._runtime.find_or_create_index_space(self._color_shape), + functor=functor, + keep=True, + ) + # TODO (rohany): Record the partition. + return region.get_child(index_partition) + + # TODO (rohany): We can probably figure this out by staring at the domain map. + def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: + return False + + # TODO (rohany): We can probably figure this out by staring at the domain map. + def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: + return False + + # TODO (rohany): IDK how we're supposed to know this about. + def satisfies_restriction( + self, restrictions: Sequence[Restriction] + ) -> bool: + raise NotImplementedError + + @property + def requirement(self) -> RequirementType: + return Partition + + @property + def runtime(self) -> Runtime: + return self._runtime + + # TODO (rohany): IDK what this means... + def needs_delinearization(self, launch_ndim: int) -> bool: + return False + + def __hash__(self) -> int: + return hash( + ( + self.__class__, + self._color_shape, + # TODO (rohany): No better ideas... + id(self._domains), + ) + ) + + # TODO (rohany): Implement this. + def __eq__(self, other : object) -> bool: + return False + + def __str__(self) -> str: + return f"by_domain({self._color_shape}, {self._domains})" + + def __repr__(self) -> str: + return str(self) diff --git a/legate/core/store.py b/legate/core/store.py index 6eb58b0f7..71c3cba1f 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -809,14 +809,17 @@ def get_child_store(self, *indices: int) -> Store: def get_requirement( self, launch_ndim: int, - proj_fn: Optional[ProjFn] = None, + proj_fn: Optional[ProjFn, int] = None, ) -> Proj: part = self._storage_partition.find_or_create_legion_partition() if part is not None: - proj_id = self._store.compute_projection(proj_fn, launch_ndim) - if self._partition.needs_delinearization(launch_ndim): - assert proj_id == 0 - proj_id = runtime.get_delinearize_functor() + if isinstance(proj_fn, int): + proj_id = proj_fn + else: + proj_id = self._store.compute_projection(proj_fn, launch_ndim) + if self._partition.needs_delinearization(launch_ndim): + assert proj_id == 0 + proj_id = runtime.get_delinearize_functor() else: proj_id = 0 return self._partition.requirement(part, proj_id) @@ -1321,6 +1324,15 @@ def partition(self, partition: PartitionBase) -> StorePartition: ) return StorePartition(self, partition, storage_partition) + # TODO (rohany): Hacking... + def direct_partition(self, partition : PartitionBase) -> StorePartition: + return StorePartition( + self._runtime, + self, + partition, + self._storage.partition(partition), + ) + def partition_by_tiling( self, tile_shape: Union[Shape, Sequence[int]] ) -> StorePartition: diff --git a/src/core/mapping/core_mapper.cc b/src/core/mapping/core_mapper.cc index 206dfeb92..3c9a1dccb 100644 --- a/src/core/mapping/core_mapper.cc +++ b/src/core/mapping/core_mapper.cc @@ -284,8 +284,9 @@ void CoreMapper::slice_task(const MapperContext ctx, const Point<1> point = itr.p; assert(point[0] >= start); assert(point[0] < (start + chunk)); - const unsigned local_index = point[0] - start; - assert(local_index < local_cpus.size()); + // Does the mapper assume that we make examples num_procs pieces? + // assert(local_index < local_cpus.size()); + const unsigned local_index = (point[0] - start) % local_cpus.size(); output.slices.push_back(TaskSlice( Domain(itr.p, itr.p), local_cpus[local_index], false /*recurse*/, false /*stealable*/)); } diff --git a/src/core/runtime/context.h b/src/core/runtime/context.h index b387b4979..506b551c3 100644 --- a/src/core/runtime/context.h +++ b/src/core/runtime/context.h @@ -142,11 +142,14 @@ class TaskContext { ReturnValues pack_return_values_with_exception(int32_t index, const std::string& error_message) const; + // The API doesn't handle passing through future maps well right now, so we need + // to access this directly. + const Legion::Task* task_; + private: std::vector get_return_values() const; private: - const Legion::Task* task_; const std::vector& regions_; Legion::Context context_; Legion::Runtime* runtime_; From 927f2d6180eb5b84dd2c83b68f2a279e8fd63b6e Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 5 Jul 2022 10:33:22 -0700 Subject: [PATCH 05/35] legate/core: track validity of and cache image partitions --- legate/core/operation.py | 28 ++++++++++---- legate/core/partition.py | 79 +++++++++++++++++++++++----------------- legate/core/runtime.py | 6 +++ legate/core/store.py | 14 ++++++- 4 files changed, 85 insertions(+), 42 deletions(-) diff --git a/legate/core/operation.py b/legate/core/operation.py index a385bf66e..d4941d901 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -141,6 +141,12 @@ def get_all_stores(self) -> OrderedSet[Store]: result.update(store for (store, _) in self._reductions) return result + def get_all_modified_stores(self) -> OrderedSet[Store]: + result = OrderedSet() + result.update(self._outputs) + result.update(store for (store, _) in self._reductions) + return result + def add_alignment(self, store1: Store, store2: Store) -> None: self._check_store(store1) self._check_store(store2) @@ -162,12 +168,14 @@ def add_broadcast( # add_image_constraint adds a constraint that the image of store1 is # contained within the partition of store2. - def add_image_constraint(self, store1: Store, store2: Store, range : bool = False): + def add_image_constraint( + self, store1: Store, store2: Store, range: bool = False + ): self._check_store(store1) self._check_store(store2) - # TODO (rohany): We only support point (and rect types if range) here. It seems - # like rects should be added to legate.core's type system rather than an external - # type system to understand this then. + # TODO (rohany): We only support point (and rect types if range) here. + # It seems like rects should be added to legate.core's type system + # rather than an external type system to understand this then. part1 = self._get_unique_partition(store1) part2 = self._get_unique_partition(store2) image = Image(store1, store2, part1, range=range) @@ -607,7 +615,9 @@ def add_input( ) -> None: self._check_arg(arg) if isinstance(arg, Store): - self._input_parts.append(arg.partition(Replicate(self.context.runtime))) + self._input_parts.append( + arg.partition(Replicate(self.context.runtime)) + ) else: self._input_parts.append(arg) self._input_projs.append(proj) @@ -625,7 +635,9 @@ def add_output( return if arg.kind is Future: self._scalar_outputs.append(len(self._outputs)) - self._output_parts.append(arg.partition(Replicate(self.context.runtime))) + self._output_parts.append( + arg.partition(Replicate(self.context.runtime)) + ) else: self._output_parts.append(arg) self._output_projs.append(proj) @@ -640,7 +652,9 @@ def add_reduction( if isinstance(arg, Store): if arg.kind is Future: self._scalar_reductions.append(len(self._reductions)) - self._reduction_parts.append((arg.partition(Replicate(self.context.runtime)), redop)) + self._reduction_parts.append( + (arg.partition(Replicate(self.context.runtime)), redop) + ) else: self._reduction_parts.append((arg, redop)) self._reduction_projs.append(proj) diff --git a/legate/core/partition.py b/legate/core/partition.py index 13f264277..85fdace6f 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -19,11 +19,11 @@ from . import ( IndexPartition, + PartitionByDomain, PartitionByImage, PartitionByImageRange, PartitionByRestriction, PartitionByWeights, - PartitionByDomain, Point, Rect, Transform, @@ -434,10 +434,7 @@ def construct( return region.get_child(index_partition) -# TODO (rohany): Do we need to have a difference between image and preimage? class ImagePartition(PartitionBase): - # TODO (rohany): What's the right type to pass through for the partitions and regions here? - # store is of type legate.Store. However, we can't import it directly due to an import cycle. def __init__( self, runtime: Runtime, @@ -477,37 +474,45 @@ def construct( functor = PartitionByImage( source_region, source_part, source_field.field_id ) - # TODO (rohany): Use some information about the partition to figure out whats going on... - # Maybe there should be hints that the user can pass in through the constraints. - kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND - # TODO (rohany): Let's just create a new partition each time. - index_partition = IndexPartition( - self._runtime.legion_context, - self._runtime.legion_runtime, - region.index_space, - source_part.color_space, - functor=functor, - kind=kind, - keep=True, + index_partition = self._runtime.find_partition( + region.index_space, self ) - # self._runtime.record_partition(region.index_space, self, index_partition) + if index_partition is None: + # TODO (rohany): Use some information about the partition to + # figure out whats going on... Maybe there should be hints that + # the user can pass in through the constraints. + kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + index_partition = IndexPartition( + self._runtime.legion_context, + self._runtime.legion_runtime, + region.index_space, + source_part.color_space, + functor=functor, + kind=kind, + keep=True, + ) + self._runtime.record_partition( + region.index_space, self, index_partition + ) return region.get_child(index_partition) - # TODO (rohany): IDK how we're supposed to know this about an image / image range. + # TODO (rohany): Use user hints about this. def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: return False - # TODO (rohany): IDK how we're supposed to know this about an image / image range. + # TODO (rohany): Use user hints about this. def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: return False - # TODO (rohany): IDK how we're supposed to know this about an image / image range. + # TODO (rohany): I'm not sure about this. It seems like it should just + # be whether the source partition satisfies the restrictions. def satisfies_restriction( self, restrictions: Sequence[Restriction] ) -> bool: raise NotImplementedError - # TODO (rohany): IDK what this means... + # TODO (rohany): I'm not sure about this. It seems like it should just + # be whether the source partition also needs delinearization. def needs_delinearization(self, launch_ndim: int) -> bool: return False @@ -519,25 +524,26 @@ def requirement(self) -> RequirementType: def runtime(self) -> Runtime: return self._runtime - # TODO (rohany): Implement... def __hash__(self) -> int: return hash( ( self.__class__, self._store, + self._store._version, self._part, self._range, ) ) def __eq__(self, other: object) -> bool: - return False - # return ( - # isinstance(other, PartitionByImage) - # and self._store == other._store - # and self._part == other._part - # and self._range == other._range - # ) + return ( + isinstance(other, ImagePartition) + # TODO (rohany): I think we can perform equality on the store. + and self._store == other._store + and self._store.version == other._store.version + and self._part == other._part + and self._range == other._range + ) def __str__(self) -> str: return f"image({self._store}, {self._part}, range={self._range})" @@ -547,7 +553,12 @@ def __repr__(self) -> str: class DomainPartition(PartitionBase): - def __init__(self, runtime: Runtime, color_shape: Shape, domains: Union[FutureMap, dict[Point, Rect]]): + def __init__( + self, + runtime: Runtime, + color_shape: Shape, + domains: Union[FutureMap, dict[Point, Rect]], + ): self._runtime = runtime self._color_shape = color_shape self._domains = domains @@ -575,17 +586,17 @@ def construct(self, region: Region, complete: bool = False): # TODO (rohany): Record the partition. return region.get_child(index_partition) - # TODO (rohany): We can probably figure this out by staring at the domain map. + # TODO (rohany): We could figure this out by staring at the domain map. def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: return False - # TODO (rohany): We can probably figure this out by staring at the domain map. + # TODO (rohany): We could figure this out by staring at the domain map. def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: return False # TODO (rohany): IDK how we're supposed to know this about. def satisfies_restriction( - self, restrictions: Sequence[Restriction] + self, restrictions: Sequence[Restriction] ) -> bool: raise NotImplementedError @@ -612,7 +623,7 @@ def __hash__(self) -> int: ) # TODO (rohany): Implement this. - def __eq__(self, other : object) -> bool: + def __eq__(self, other: object) -> bool: return False def __str__(self) -> str: diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 6772a0b52..61e64f6d9 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -1030,6 +1030,12 @@ def _schedule(self, ops: List[Operation]) -> None: strategies.append(partitioner.partition_stores()) for op, strategy in zip(ops, strategies): op.launch(strategy) + # We also need to bump the versions for each modified store. + # TODO (rohany): We also need a callback here to evict cached + # partitions with old store values so that we don't leak these. + for store in op.get_all_modified_stores(): + # TODO (rohany): Make this a method on the store. + store.bump_version() def flush_scheduling_window(self) -> None: if len(self._outstanding_ops) == 0: diff --git a/legate/core/store.py b/legate/core/store.py index 71c3cba1f..23719724c 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -874,6 +874,11 @@ def __init__( # when no custom functor is given self._projection: Union[None, int] = None self._restrictions: Union[None, tuple[Restriction, ...]] = None + # We maintain a version on store objects to cache dependent + # partitions created from the store. Operations that write + # to stores will bump their version and invalidate dependent + # partitions that were created with this store as the source. + self._version = 0 if self._shape is not None: if any(extent < 0 for extent in self._shape.extents): @@ -968,6 +973,13 @@ def transform(self) -> TransformStackBase: def transformed(self) -> bool: return not self._transform.bottom + @property + def version(self) -> int: + return self._version + + def bump_version(self): + self._version += 1 + def attach_external_allocation( self, context: Context, alloc: Attachable, share: bool ) -> None: @@ -1325,7 +1337,7 @@ def partition(self, partition: PartitionBase) -> StorePartition: return StorePartition(self, partition, storage_partition) # TODO (rohany): Hacking... - def direct_partition(self, partition : PartitionBase) -> StorePartition: + def direct_partition(self, partition: PartitionBase) -> StorePartition: return StorePartition( self._runtime, self, From 2730fc5dc43f4b7c0ed22447568667f6858fe10f Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 5 Jul 2022 10:44:08 -0700 Subject: [PATCH 06/35] legate/core: add user hints about disjointness/completeness of images --- legate/core/partition.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 85fdace6f..7734fa948 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -441,12 +441,16 @@ def __init__( store: Any, part: PartitionBase, range: bool = False, + disjoint: bool = True, + complete: bool = True, ) -> None: self._runtime = runtime self._store = store self._part = part # Whether this is an image or image_range operation. self._range = range + self._disjoint = disjoint + self._complete = complete @property def color_shape(self) -> Optional[Shape]: @@ -478,10 +482,14 @@ def construct( region.index_space, self ) if index_partition is None: - # TODO (rohany): Use some information about the partition to - # figure out whats going on... Maybe there should be hints that - # the user can pass in through the constraints. - kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + if self._disjoint and self._complete: + kind = legion.LEGION_DISJOINT_COMPLETE_KIND + elif self._disjoint and not self._complete: + kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + elif not self._disjoint and self._complete: + kind = legion.LEGION_ALIASED_COMPLETE_KIND + else: + kind = legion.LEGION_ALIASED_INCOMPLETE_KIND index_partition = IndexPartition( self._runtime.legion_context, self._runtime.legion_runtime, @@ -496,13 +504,11 @@ def construct( ) return region.get_child(index_partition) - # TODO (rohany): Use user hints about this. def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: - return False + return self._complete - # TODO (rohany): Use user hints about this. def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: - return False + return self._disjoint # TODO (rohany): I'm not sure about this. It seems like it should just # be whether the source partition satisfies the restrictions. From 47eb03b0589f276d8b05602b5ceb651f10e91d77 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 5 Jul 2022 13:10:07 -0700 Subject: [PATCH 07/35] legate/core: make sure that mapper id's are passed to image partitions --- legate/core/constraints.py | 12 ++++++++++-- legate/core/operation.py | 4 +++- legate/core/partition.py | 14 ++++++++++++-- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index c9594cc15..df5da0c57 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -223,11 +223,13 @@ def __init__( source_store: Store, dst_store: Store, src_part_sym: Expr, + mapper: int, range: bool = False, ): self._source_store = source_store self._dst_store = dst_store self._src_part_sym = src_part_sym + self._mapper = mapper self._range = range def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: @@ -235,6 +237,7 @@ def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: self._source_store, self._dst_store, self._src_part_sym.subst(mapping), + self._mapper, range=self._range, ) @@ -246,7 +249,11 @@ def reduce(self) -> Lit: return Lit(part) return Lit( ImagePartition( - part.runtime, self._source_store, part, range=self._range + part.runtime, + self._source_store, + part, + self._mapper, + range=self._range, ) ) @@ -255,13 +262,14 @@ def unknowns(self) -> Iterator[PartSym]: yield unknown def equals(self, other: object): - # TODO (rohany): Careful... overloaded equals operator... return ( isinstance(other, Image) and self._source_store == other._source_store and self._dst_store == other._dst_store + # Careful! Overloaded equals operator. and self._src_part_sym is other._src_part_sym and self._range == other._range + and self._mapper == other._mapper, ) diff --git a/legate/core/operation.py b/legate/core/operation.py index d4941d901..0d2a88ade 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -178,7 +178,9 @@ def add_image_constraint( # rather than an external type system to understand this then. part1 = self._get_unique_partition(store1) part2 = self._get_unique_partition(store2) - image = Image(store1, store2, part1, range=range) + image = Image( + store1, store2, part1, self._context.mapper_id, range=range + ) self.add_constraint(image <= part2) def add_constraint(self, constraint: Constraint) -> None: diff --git a/legate/core/partition.py b/legate/core/partition.py index 7734fa948..df10c3b67 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -440,11 +440,13 @@ def __init__( runtime: Runtime, store: Any, part: PartitionBase, + mapper: int, range: bool = False, disjoint: bool = True, complete: bool = True, ) -> None: self._runtime = runtime + self._mapper = mapper self._store = store self._part = part # Whether this is an image or image_range operation. @@ -472,11 +474,17 @@ def construct( source_part = self._part.construct(source_region) if self._range: functor = PartitionByImageRange( - source_region, source_part, source_field.field_id + source_region, + source_part, + source_field.field_id, + mapper=self._mapper, ) else: functor = PartitionByImage( - source_region, source_part, source_field.field_id + source_region, + source_part, + source_field.field_id, + mapper=self._mapper, ) index_partition = self._runtime.find_partition( region.index_space, self @@ -538,6 +546,7 @@ def __hash__(self) -> int: self._store._version, self._part, self._range, + self._mapper, ) ) @@ -549,6 +558,7 @@ def __eq__(self, other: object) -> bool: and self._store.version == other._store.version and self._part == other._part and self._range == other._range + and self._mapper == other._mapper ) def __str__(self) -> str: From 47018d8f7433f78edede55a21bcbd5c7fafbd0c3 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 14 Jul 2022 14:52:23 -0700 Subject: [PATCH 08/35] legate/core: stop overriding the _num_pieces knob in the runtime --- legate/core/runtime.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 61e64f6d9..0bc2c0f71 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -485,11 +485,10 @@ def prune_detachments(self) -> None: class PartitionManager: def __init__(self, runtime: Runtime) -> None: self._runtime = runtime - # self._num_pieces = runtime.core_context.get_tunable( - # runtime.core_library.LEGATE_CORE_TUNABLE_NUM_PIECES, - # ty.int32, - # ) - self._num_pieces = 4 + self._num_pieces = runtime.core_context.get_tunable( + runtime.core_library.LEGATE_CORE_TUNABLE_NUM_PIECES, + ty.int32, + ) self._min_shard_volume = runtime.core_context.get_tunable( runtime.core_library.LEGATE_CORE_TUNABLE_MIN_SHARD_VOLUME, ty.int64, From 29523462be7538743fee319c1f17c09eeef5aa26 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 14 Jul 2022 21:54:00 +0000 Subject: [PATCH 09/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- legate/core/_legion/partition_functor.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/legate/core/_legion/partition_functor.py b/legate/core/_legion/partition_functor.py index 3383922c6..1ae25543f 100644 --- a/legate/core/_legion/partition_functor.py +++ b/legate/core/_legion/partition_functor.py @@ -17,13 +17,20 @@ from typing import TYPE_CHECKING, Any, Union from .. import ffi, legion +from . import FieldID from .future import FutureMap from .geometry import Point -from . import FieldID - if TYPE_CHECKING: - from . import FieldID, IndexPartition, IndexSpace, Partition, Rect, Region, Transform + from . import ( + FieldID, + IndexPartition, + IndexSpace, + Partition, + Rect, + Region, + Transform, + ) class PartitionFunctor: From 78c9b8bc726b6de73a46e186a9ae70bdad72cc68 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 21 Jul 2022 09:34:34 -0700 Subject: [PATCH 10/35] legate/core: enable specifying an image functor for image constraints --- legate/core/constraints.py | 8 ++++++-- legate/core/operation.py | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index df5da0c57..5c221eeb3 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -225,12 +225,14 @@ def __init__( src_part_sym: Expr, mapper: int, range: bool = False, + functor: Any = ImagePartition, ): self._source_store = source_store self._dst_store = dst_store self._src_part_sym = src_part_sym self._mapper = mapper self._range = range + self._functor = functor def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: return Image( @@ -239,6 +241,7 @@ def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: self._src_part_sym.subst(mapping), self._mapper, range=self._range, + functor=self._functor, ) def reduce(self) -> Lit: @@ -248,7 +251,7 @@ def reduce(self) -> Lit: if isinstance(part, Replicate): return Lit(part) return Lit( - ImagePartition( + self._functor( part.runtime, self._source_store, part, @@ -269,7 +272,8 @@ def equals(self, other: object): # Careful! Overloaded equals operator. and self._src_part_sym is other._src_part_sym and self._range == other._range - and self._mapper == other._mapper, + and self._mapper == other._mapper + and self._functor == other._functor ) diff --git a/legate/core/operation.py b/legate/core/operation.py index 0d2a88ade..1daee006c 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -21,7 +21,7 @@ from . import Future, FutureMap, Rect from .constraints import Image, PartSym from .launcher import CopyLauncher, TaskLauncher -from .partition import Replicate, Weighted +from .partition import ImagePartition, Replicate, Weighted from .shape import Shape from .store import Store, StorePartition from .utils import OrderedSet, capture_traceback @@ -169,7 +169,11 @@ def add_broadcast( # add_image_constraint adds a constraint that the image of store1 is # contained within the partition of store2. def add_image_constraint( - self, store1: Store, store2: Store, range: bool = False + self, + store1: Store, + store2: Store, + range: bool = False, + functor: Any = ImagePartition, ): self._check_store(store1) self._check_store(store2) @@ -179,7 +183,12 @@ def add_image_constraint( part1 = self._get_unique_partition(store1) part2 = self._get_unique_partition(store2) image = Image( - store1, store2, part1, self._context.mapper_id, range=range + store1, + store2, + part1, + self._context.mapper_id, + range=range, + functor=functor, ) self.add_constraint(image <= part2) From 28e599275ecabe88abbfa6adaf6924f8205c67d2 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 26 Jul 2022 13:48:08 -0700 Subject: [PATCH 11/35] legate/core: fix bug in solver causing unnecessary replication --- legate/core/runtime.py | 1 - legate/core/solver.py | 47 +++++++++++++++--------------------------- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 0bc2c0f71..2ae96b4e4 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -1033,7 +1033,6 @@ def _schedule(self, ops: List[Operation]) -> None: # TODO (rohany): We also need a callback here to evict cached # partitions with old store values so that we don't leak these. for store in op.get_all_modified_stores(): - # TODO (rohany): Make this a method on the store. store.bump_version() def flush_scheduling_window(self) -> None: diff --git a/legate/core/solver.py b/legate/core/solver.py index 577f6de17..15e6b6d3c 100644 --- a/legate/core/solver.py +++ b/legate/core/solver.py @@ -404,7 +404,6 @@ def partition_stores(self) -> Strategy: for op in self._ops: unknowns.update(op.all_unknowns) for c in op.constraints: - # print(c) if isinstance(c, Alignment): constraints.record(c._lhs, c._rhs) elif isinstance(c, Broadcast): @@ -414,12 +413,13 @@ def partition_stores(self) -> Strategy: ): if c._lhs in dependent: rhs = dependent[c._lhs] - # While we can't have multiple constraints, we are ok with seeing a duplicate - # image constraint, as that doesn't affect the solving. + # While we can't have multiple constraints, we are ok + # with seeing a duplicate image constraint, as that + # doesn't affect the solving. if not (isinstance(rhs, Image) and rhs.equals(c._rhs)): raise NotImplementedError( - "Partitions constrained by multiple constraints " - "are not supported yet" + "Partitions constrained by multiple " + "constraints are not supported yet" ) for unknown in c._rhs.unknowns(): must_be_even.add(unknown) @@ -429,12 +429,13 @@ def partition_stores(self) -> Strategy: ): if c._rhs in dependent: lhs = dependent[c._rhs] - # While we can't have multiple constraints, we are ok with seeing a duplicate - # image constraint, as that doesn't affect the solving. + # While we can't have multiple constraints, we are ok + # with seeing a duplicate image constraint, as that + # doesn't affect the solving. if not (isinstance(lhs, Image) and lhs.equals(c._lhs)): raise NotImplementedError( - "Partitions constrained by multiple constraints " - "are not supported yet" + "Partitions constrained by multiple " + "constraints are not supported yet" ) for unknown in c._lhs.unknowns(): must_be_even.add(unknown) @@ -446,13 +447,6 @@ def partition_stores(self) -> Strategy: store for store in op.outputs if not store.unbound ) - # assert(len(self._ops) == 1) - # print("Op constraints", self._ops[0].constraints) - # # print("Constraints: ", constraints) - # print("Depends", dependent) - - # print("Unknowns: ", list(unknowns)) - if self._must_be_single or len(unknowns) == 0: for unknown in unknowns: c = unknown.broadcast() @@ -478,8 +472,6 @@ def partition_stores(self) -> Strategy: unknowns, broadcasts, constraints ) - # print("Next set unknowns: ", list(unknowns)) - def cost(unknown: PartSym) -> tuple[int, bool]: store = unknown.store return ( @@ -499,28 +491,27 @@ def cost(unknown: PartSym) -> tuple[int, bool]: store = unknown.store restrictions = all_restrictions[unknown] cls = constraints.find(unknown) - # print("Constraints for: ", unknown, list(cls)) - # print("Current partitions: ", partitions) - # If we are supposed to be aligned with a partition in dependents, then - # don't make a decision right now. + # If we are supposed to be aligned with a partition in dependents, + # then don't make a decision right now. depends = False for to_align in cls: if to_align in dependent: depends = True - # print(unknown, "depends on other parts") if depends: continue - # TODO (rohany): If we already have a partition for this equality class - # we need to use it. + # If we already have a partition for this equality class we + # need to use it. for to_align in cls: + # Don't align to Futures, as those are trivially replicated. + if to_align.store.kind is Future: + continue if to_align in partitions: partition = partitions[to_align] break else: partition = store.compute_key_partition(restrictions) - # print("Key partition for store: ", store, partition) if not partition.even and len(cls) > 1: ( partition, @@ -534,21 +525,17 @@ def cost(unknown: PartSym) -> tuple[int, bool]: ) key_parts.add(unknown) - # print("computed partition", partition) - for to_align in cls: if to_align in partitions: continue partitions[to_align] = partition for rhs, lhs in dependent.items(): - # print(rhs, lhs) expr = lhs.subst(partitions).reduce() if TYPE_CHECKING: assert isinstance(expr, Lit) partitions[rhs] = expr._part - # Comment... for unknown in sorted_unknowns: if unknown in partitions: continue From 129ec25395e350bcdc772764aa0abd061ef7a0ae Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 28 Jul 2022 16:38:19 -0700 Subject: [PATCH 12/35] legate/core: add preimage partitions and update domain partitions --- legate/core/__init__.py | 2 + legate/core/_legion/__init__.py | 4 + legate/core/_legion/partition.py | 20 ++++ legate/core/partition.py | 180 +++++++++++++++++++++++++++---- 4 files changed, 185 insertions(+), 21 deletions(-) diff --git a/legate/core/__init__.py b/legate/core/__init__.py index cc3bc4027..1befb8968 100644 --- a/legate/core/__init__.py +++ b/legate/core/__init__.py @@ -36,6 +36,8 @@ PartitionByRestriction, PartitionByImage, PartitionByImageRange, + PartitionByPreimage, + PartitionByPreimageRange, EqualPartition, PartitionByWeights, IndexPartition, diff --git a/legate/core/_legion/__init__.py b/legate/core/_legion/__init__.py index f67fa6793..8c6529365 100644 --- a/legate/core/_legion/__init__.py +++ b/legate/core/_legion/__init__.py @@ -38,6 +38,8 @@ PartitionByRestriction, PartitionByImage, PartitionByImageRange, + PartitionByPreimage, + PartitionByPreimageRange, EqualPartition, PartitionByWeights, PartitionByDomain, @@ -88,6 +90,8 @@ "PartitionByDomain", "PartitionByImage", "PartitionByImageRange", + "PartitionByPreimage", + "PartitionByPreimageRange", "PartitionByRestriction", "PartitionByWeights", "PartitionFunctor", diff --git a/legate/core/_legion/partition.py b/legate/core/_legion/partition.py index c0d4c5c24..0c4c3d026 100644 --- a/legate/core/_legion/partition.py +++ b/legate/core/_legion/partition.py @@ -109,6 +109,14 @@ def get_root(self) -> Region: """ return self.parent.get_root() + @property + def disjoint(self) -> bool: + return self.index_partition.disjoint + + @property + def complete(self) -> bool: + return self.index_partition.complete + class IndexPartition: _logical_handle: Any @@ -260,3 +268,15 @@ def get_root(self) -> IndexSpace: Return the root IndexSpace in this tree. """ return self.parent.get_root() + + @property + def disjoint(self) -> bool: + return legion.legion_index_partition_is_disjoint( + self.runtime, self.handle + ) + + @property + def complete(self) -> bool: + return legion.legion_index_partition_is_complete( + self.runtime, self.handle + ) diff --git a/legate/core/partition.py b/legate/core/partition.py index df10c3b67..63acc9f09 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -22,6 +22,8 @@ PartitionByDomain, PartitionByImage, PartitionByImageRange, + PartitionByPreimage, + PartitionByPreimageRange, PartitionByRestriction, PartitionByWeights, Point, @@ -518,17 +520,16 @@ def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: return self._disjoint - # TODO (rohany): I'm not sure about this. It seems like it should just - # be whether the source partition satisfies the restrictions. def satisfies_restriction( self, restrictions: Sequence[Restriction] ) -> bool: - raise NotImplementedError + for restriction in restrictions: + if restriction != Restriction.UNRESTRICTED: + raise NotImplementedError + return True - # TODO (rohany): I'm not sure about this. It seems like it should just - # be whether the source partition also needs delinearization. def needs_delinearization(self, launch_ndim: int) -> bool: - return False + return launch_ndim != self.color_shape.ndim @property def requirement(self) -> RequirementType: @@ -568,6 +569,139 @@ def __repr__(self) -> str: return str(self) +class PreimagePartition(PartitionBase): + # TODO (rohany): I don't even know if I need a store here. I really just + # need the index space that is being partitioned (or the IndexPartition). + # For simplicities sake it seems like taking the store is fine. + def __init__( + self, + runtime: Runtime, + source: Any, + dest: Any, + part: PartitionBase, + mapper: int, + range: bool = False, + disjoint: bool = False, + complete: bool = True, + ) -> None: + self._runtime = runtime + self._mapper = mapper + self._source = source + self._dest = dest + self._part = part + # Whether this is an image or image_range operation. + self._range = range + self._disjoint = disjoint + self._complete = complete + + @property + def color_shape(self) -> Optional[Shape]: + return self._part.color_shape + + @property + def even(self) -> bool: + return False + + def construct( + self, region: Region, complete: bool = False + ) -> Optional[LegionPartition]: + # TODO (rohany): What should the value of complete be? + dest_part = self._part.construct(self._dest.storage.region) + source_region = self._source.storage.region + source_field = self._source.storage.field.field_id + functorFn = ( + PartitionByPreimageRange if self._range else PartitionByPreimage + ) + functor = functorFn( + dest_part.index_partition, + source_region, + source_region, + source_field, + mapper=self._mapper, + ) + index_partition = self._runtime.find_partition( + region.index_space, self + ) + if index_partition is None: + if self._disjoint and self._complete: + kind = legion.LEGION_DISJOINT_COMPLETE_KIND + elif self._disjoint and not self._complete: + kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + elif not self._disjoint and self._complete: + kind = legion.LEGION_ALIASED_COMPLETE_KIND + else: + kind = legion.LEGION_ALIASED_INCOMPLETE_KIND + index_partition = IndexPartition( + self._runtime.legion_context, + self._runtime.legion_runtime, + region.index_space, + dest_part.color_space, + functor=functor, + kind=kind, + keep=True, + ) + self._runtime.record_partition( + region.index_space, self, index_partition + ) + return region.get_child(index_partition) + + def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: + return self._complete + + def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: + return self._disjoint + + def satisfies_restriction( + self, restrictions: Sequence[Restriction] + ) -> bool: + for restriction in restrictions: + if restriction != Restriction.UNRESTRICTED: + raise NotImplementedError + return True + + def needs_delinearization(self, launch_ndim: int) -> bool: + return launch_ndim != self.color_shape.ndim + + @property + def requirement(self) -> RequirementType: + return Partition + + @property + def runtime(self) -> Runtime: + return self._runtime + + def __hash__(self) -> int: + return hash( + ( + self.__class__, + self._source, + self._source._version, + self._dest.storage.region.index_space, + self._part, + self._range, + self._mapper, + ) + ) + + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, PreimagePartition) + # TODO (rohany): I think we can perform equality on the store. + and self._source == other._source + and self._source._version == other._source._version + and self._dest == other._dest + and self._part == other._part + and self._range == other._range + and self._mapper == other._mapper + ) + + def __str__(self) -> str: + return f"preimage({self._store}, {self._part}, range={self._range})" + + def __repr__(self) -> str: + return str(self) + + class DomainPartition(PartitionBase): def __init__( self, @@ -588,18 +722,19 @@ def even(self) -> bool: return False def construct(self, region: Region, complete: bool = False): - # TODO (rohany): Think about caching these things. - functor = PartitionByDomain(self._domains) index_space = region.index_space - index_partition = IndexPartition( - self._runtime.legion_context, - self._runtime.legion_runtime, - index_space, - self._runtime.find_or_create_index_space(self._color_shape), - functor=functor, - keep=True, - ) - # TODO (rohany): Record the partition. + index_partition = self._runtime.find_partition(index_space, self) + if index_partition is None: + functor = PartitionByDomain(self._domains) + index_partition = IndexPartition( + self._runtime.legion_context, + self._runtime.legion_runtime, + index_space, + self._runtime.find_or_create_index_space(self._color_shape), + functor=functor, + keep=True, + ) + self._runtime.record_partition(index_space, self, index_partition) return region.get_child(index_partition) # TODO (rohany): We could figure this out by staring at the domain map. @@ -610,11 +745,15 @@ def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: def is_disjoint_for(self, launch_domain: Optional[Rect]) -> bool: return False - # TODO (rohany): IDK how we're supposed to know this about. def satisfies_restriction( self, restrictions: Sequence[Restriction] ) -> bool: - raise NotImplementedError + for restriction in restrictions: + # If there are some restricted dimensions to this store, + # then this key partition is likely not a good choice. + if restriction == Restriction.RESTRICTED: + return False + return True @property def requirement(self) -> RequirementType: @@ -624,9 +763,8 @@ def requirement(self) -> RequirementType: def runtime(self) -> Runtime: return self._runtime - # TODO (rohany): IDK what this means... def needs_delinearization(self, launch_ndim: int) -> bool: - return False + return launch_ndim != self._color_shape.ndim def __hash__(self) -> int: return hash( From 73d244191f15ea44591bcfa617ef4ece9dcc7ad8 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Mon, 1 Aug 2022 00:30:33 -0700 Subject: [PATCH 13/35] legate/core: properly check for restrictions on image partitions --- legate/core/partition.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 63acc9f09..e5656d04c 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -524,8 +524,10 @@ def satisfies_restriction( self, restrictions: Sequence[Restriction] ) -> bool: for restriction in restrictions: - if restriction != Restriction.UNRESTRICTED: - raise NotImplementedError + # If there are some restricted dimensions to this store, + # then this key partition is likely not a good choice. + if restriction == Restriction.RESTRICTED: + return False return True def needs_delinearization(self, launch_ndim: int) -> bool: From 3be55085ba661def39d1d95a2371aa3cfcae7b33 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Mon, 1 Aug 2022 12:16:30 -0700 Subject: [PATCH 14/35] legate/core: be permissive to numpy types when converting Shape tuples --- legate/core/shape.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/legate/core/shape.py b/legate/core/shape.py index b45c07e27..1bb5ba98a 100644 --- a/legate/core/shape.py +++ b/legate/core/shape.py @@ -17,6 +17,7 @@ from functools import reduce from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union, overload +import numpy as np from typing_extensions import TypeAlias if TYPE_CHECKING: @@ -31,7 +32,7 @@ def _cast_tuple(value: ExtentLike, ndim: int) -> tuple[int, ...]: return value.extents elif isinstance(value, Iterable): return tuple(value) - elif isinstance(value, int): + elif isinstance(value, int) or np.issubdtype(type(value), np.integer): return (value,) * ndim else: raise ValueError(f"Cannot cast {type(value).__name__} to tuple") From 3f00a0d6a020c812fa1bc7a988eb95624d3fd32b Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 2 Aug 2022 23:32:22 -0700 Subject: [PATCH 15/35] legate/core: stop directly constructing Legion Partitions --- legate/core/partition.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index e5656d04c..15823d65a 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -473,7 +473,7 @@ def construct( source_field = self._store.storage.field # TODO (rohany): What should the value of complete be? - source_part = self._part.construct(source_region) + source_part = self._store.find_or_create_legion_partition(self._part) if self._range: functor = PartitionByImageRange( source_region, @@ -608,7 +608,7 @@ def construct( self, region: Region, complete: bool = False ) -> Optional[LegionPartition]: # TODO (rohany): What should the value of complete be? - dest_part = self._part.construct(self._dest.storage.region) + dest_part = self._dest.find_or_create_legion_partition(self._part) source_region = self._source.storage.region source_field = self._source.storage.field.field_id functorFn = ( From 8a69fde9ab3292482035f9340bcb6913719005b5 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 3 Aug 2022 21:48:19 -0700 Subject: [PATCH 16/35] Revert "legate/core: stop directly constructing Legion Partitions" This reverts commit 8206b263a33c49d8b4b74b40ca02e7ddc3f82afa. --- legate/core/partition.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 15823d65a..e5656d04c 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -473,7 +473,7 @@ def construct( source_field = self._store.storage.field # TODO (rohany): What should the value of complete be? - source_part = self._store.find_or_create_legion_partition(self._part) + source_part = self._part.construct(source_region) if self._range: functor = PartitionByImageRange( source_region, @@ -608,7 +608,7 @@ def construct( self, region: Region, complete: bool = False ) -> Optional[LegionPartition]: # TODO (rohany): What should the value of complete be? - dest_part = self._dest.find_or_create_legion_partition(self._part) + dest_part = self._part.construct(self._dest.storage.region) source_region = self._source.storage.region source_field = self._source.storage.field.field_id functorFn = ( From c67162e697bfa322d374752405ba571f70e58e02 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 4 Aug 2022 00:13:46 -0700 Subject: [PATCH 17/35] legate/core: supporting partitions when dimensions dont match color space Most of these changes are written by Wonchan. --- legate/core/partition.py | 74 +++++++++++++++++++++++++++++++++------- legate/core/runtime.py | 22 ++++++++---- legate/core/store.py | 42 ++++++++++++++++++----- legate/core/transform.py | 34 ++++++++++++++++++ 4 files changed, 144 insertions(+), 28 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index e5656d04c..501d11e49 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -54,7 +54,11 @@ def even(self) -> bool: @abstractmethod def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: ... @@ -141,7 +145,11 @@ def scale(self, scale: tuple[int]) -> Replicate: return self def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: return None @@ -304,10 +312,15 @@ def scale(self, scale: tuple[int]) -> Tiling: ) def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: + assert color_shape is None or color_transform is not None index_space = region.index_space - index_partition = runtime.find_partition(index_space, self) + index_partition = runtime.find_partition(index_space, self, color_shape=color_shape) if index_partition is None: tile_shape = self._tile_shape transform = Transform(tile_shape.ndim, tile_shape.ndim) @@ -319,12 +332,26 @@ def construct( extent = Rect(hi, lo, exclusive=False) - color_space = runtime.find_or_create_index_space(self._color_shape) + color_space = runtime.find_or_create_index_space( + self._color_shape if color_shape is None else color_shape + ) + + if color_transform is not None: + transform = color_transform.compose(transform) + functor = PartitionByRestriction(transform, extent) if complete: - kind = legion.LEGION_DISJOINT_COMPLETE_KIND + kind = ( + legion.LEGION_DISJOINT_COMPLETE_KIND + if color_shape is None + else legion.LEGION_ALIASED_COMPLETE_KIND + ) else: - kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND + kind = ( + legion.LEGION_DISJOINT_INCOMPLETE_KIND + if color_shape is None + else legion.LEGION_ALIASED_INCOMPLETE_KIND + ) index_partition = IndexPartition( runtime.legion_context, runtime.legion_runtime, @@ -334,7 +361,7 @@ def construct( kind=kind, keep=True, # export this partition functor to other libraries ) - runtime.record_partition(index_space, self, index_partition) + runtime.record_partition(index_space, self, index_partition, color_shape=color_shape) return region.get_child(index_partition) @@ -413,7 +440,11 @@ def translate_range(self, offset: Shape) -> None: raise NotImplementedError("This method shouldn't be invoked") def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: assert complete @@ -465,7 +496,11 @@ def even(self) -> bool: return False def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: # TODO (rohany): We can't import RegionField due to an import cycle. # assert(isinstance(self._store.storage, RegionField)) @@ -473,7 +508,10 @@ def construct( source_field = self._store.storage.field # TODO (rohany): What should the value of complete be? - source_part = self._part.construct(source_region) + source_part = self._store.find_or_create_legion_partition( + self._part, + preserve_colors=True, + ) if self._range: functor = PartitionByImageRange( source_region, @@ -605,7 +643,11 @@ def even(self) -> bool: return False def construct( - self, region: Region, complete: bool = False + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: # TODO (rohany): What should the value of complete be? dest_part = self._part.construct(self._dest.storage.region) @@ -723,7 +765,13 @@ def color_shape(self) -> Optional[Shape]: def even(self) -> bool: return False - def construct(self, region: Region, complete: bool = False): + def construct( + self, + region: Region, + complete: bool = False, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, + ) -> Optional[LegionPartition]: index_space = region.index_space index_partition = self._runtime.find_partition(index_space, self) if index_partition is None: diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 2ae96b4e4..9e139f505 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -726,9 +726,12 @@ def use_complete_tiling(self, shape: Shape, tile_shape: Shape) -> bool: return not (num_tiles > 256 and num_tiles > 16 * self._num_pieces) def find_partition( - self, index_space: IndexSpace, functor: PartitionBase + self, + index_space: IndexSpace, + functor: PartitionBase, + color_shape: Optional[Shape] = None, ) -> Union[IndexPartition, None]: - key = (index_space, functor) + key = (index_space, functor, color_shape) return self._index_partitions.get(key) def record_partition( @@ -736,8 +739,9 @@ def record_partition( index_space: IndexSpace, functor: PartitionBase, index_partition: IndexPartition, + color_shape: Optional[Shape] = None, ) -> None: - key = (index_space, functor) + key = (index_space, functor, color_shape) assert key not in self._index_partitions self._index_partitions[key] = index_partition @@ -1269,18 +1273,24 @@ def create_region( ) def find_partition( - self, index_space: IndexSpace, functor: PartitionBase + self, + index_space: IndexSpace, + functor: PartitionBase, + color_shape: Optional[Shape] = None, ) -> Union[IndexPartition, None]: - return self._partition_manager.find_partition(index_space, functor) + return self._partition_manager.find_partition( + index_space, functor, color_shape=color_shape + ) def record_partition( self, index_space: IndexSpace, functor: PartitionBase, index_partition: IndexPartition, + color_shape: Optional[Shape] = None, ) -> None: self._partition_manager.record_partition( - index_space, functor, index_partition + index_space, functor, index_partition, color_shape=color_shape, ) def extract_scalar(self, future: Future, idx: int) -> Future: diff --git a/legate/core/store.py b/legate/core/store.py index 23719724c..456205d69 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -752,18 +752,29 @@ def reset_key_partition(self) -> None: self._key_partition = None def find_or_create_legion_partition( - self, functor: PartitionBase, complete: bool + self, + functor: PartitionBase, + complete: bool, + color_shape: Optional[Shape] = None, + color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: if self.kind is not RegionField: return None assert isinstance(self.data, RegionField) + assert color_shape is None or color_transform is not None - if functor in self._partitions: + if functor in self._partitions and color_shape is None: return self._partitions[functor] - part = functor.construct(self.data.region, complete=complete) - self._partitions[functor] = part + part = functor.construct( + self.data.region, + complete=complete, + color_shape=color_shape, + color_transform=color_transform, + ) + if color_shape is None: + self._partitions[functor] = part return part @@ -1320,15 +1331,28 @@ def find_restrictions(self) -> tuple[Restriction, ...]: return self._restrictions def find_or_create_legion_partition( - self, partition: PartitionBase, complete: bool = False + self, + partition: PartitionBase, + complete: bool = False, + preserve_colors: bool = False, ) -> Optional[LegionPartition]: # Create a Legion partition for a given functor. # Before we do that, we need to map the partition back # to the original coordinate space. - return self._storage.find_or_create_legion_partition( - self._transform.invert_partition(partition), - complete=complete, - ) + if preserve_colors: + return self._storage.find_or_create_legion_partition( + self._transform.invert_partition(partition), + complete=complete, + color_shape=partition.color_shape, + color_transform=self._transform.get_inverse_color_transform( + partition.color_shape.ndim, + ) + ) + else: + return self._storage.find_or_create_legion_partition( + self._transform.invert_partition(partition), + complete=complete, + ) def partition(self, partition: PartitionBase) -> StorePartition: storage_partition = self._storage.partition( diff --git a/legate/core/transform.py b/legate/core/transform.py index 4cd7475c6..d277bde15 100644 --- a/legate/core/transform.py +++ b/legate/core/transform.py @@ -19,6 +19,7 @@ import numpy as np from . import AffineTransform +from . import Transform as LegionTransform from .partition import Replicate, Restriction, Tiling from .projection import ProjExpr from .runtime import runtime @@ -87,6 +88,9 @@ def invert_restrictions(self, restrictions: Restrictions) -> Restrictions: def get_inverse_transform(self, ndim: int) -> AffineTransform: ... + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + ... + class Transform(TransformProto, Protocol): pass @@ -171,6 +175,9 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: result.offset[self._dim] = -self._offset return result + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + raise NotImplementedError("Not implemented yet") + def serialize(self, buf: BufferBuilder) -> None: code = runtime.get_transform_code(self.__class__.__name__) buf.pack_32bit_int(code) @@ -268,6 +275,16 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: parent_dim += 1 return result + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + parent_ndim = ndim - 1 + result = LegionTransform(parent_ndim, ndim) + parent_dim = 0 + for child_dim in range(ndim): + if child_dim != self._extra_dim: + result.trans[parent_dim, child_dim] = 1 + parent_dim += 1 + return result + def serialize(self, buf: BufferBuilder) -> None: code = runtime.get_transform_code(self.__class__.__name__) buf.pack_32bit_int(code) @@ -363,6 +380,9 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: child_dim += 1 return result + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + raise NotImplementedError("Not implemented yet") + def serialize(self, buf: BufferBuilder) -> None: code = runtime.get_transform_code(self.__class__.__name__) buf.pack_32bit_int(code) @@ -449,6 +469,9 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: result.trans[self._axes[dim], dim] = 1 return result + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + raise NotImplementedError("Not implemented yet") + def serialize(self, buf: BufferBuilder) -> None: code = runtime.get_transform_code(self.__class__.__name__) buf.pack_32bit_int(code) @@ -571,6 +594,9 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: return result + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + raise NotImplementedError("Not implemented yet") + def serialize(self, buf: BufferBuilder) -> None: code = runtime.get_transform_code(self.__class__.__name__) buf.pack_32bit_int(code) @@ -660,6 +686,11 @@ def get_inverse_transform(self, ndim: int) -> AffineTransform: parent = self._parent.get_inverse_transform(transform.M) return transform.compose(parent) + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + transform = self._transform.get_inverse_color_transform(ndim) + parent = self._parent.get_inverse_color_transform(transform.M) + return transform.compose(parent) + def stack(self, transform: Transform) -> TransformStack: return TransformStack(transform, self) @@ -719,6 +750,9 @@ def invert_restrictions(self, restrictions: Restrictions) -> Restrictions: def get_inverse_transform(self, ndim: int) -> AffineTransform: return AffineTransform(ndim, ndim, True) + def get_inverse_color_transform(self, ndim: int) -> LegionTransform: + return LegionTransform(ndim, ndim, True) + def stack(self, transform: Transform) -> TransformStack: return TransformStack(transform, self) From b4a013137c83386bd6be8d791fd03f26eee74301 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 Aug 2022 18:29:12 +0000 Subject: [PATCH 18/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- legate/core/partition.py | 8 ++++++-- legate/core/runtime.py | 5 ++++- legate/core/store.py | 2 +- legate/core/transform.py | 3 +-- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 501d11e49..1537eb515 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -320,7 +320,9 @@ def construct( ) -> Optional[LegionPartition]: assert color_shape is None or color_transform is not None index_space = region.index_space - index_partition = runtime.find_partition(index_space, self, color_shape=color_shape) + index_partition = runtime.find_partition( + index_space, self, color_shape=color_shape + ) if index_partition is None: tile_shape = self._tile_shape transform = Transform(tile_shape.ndim, tile_shape.ndim) @@ -361,7 +363,9 @@ def construct( kind=kind, keep=True, # export this partition functor to other libraries ) - runtime.record_partition(index_space, self, index_partition, color_shape=color_shape) + self._runtime.record_partition( + index_space, self, index_partition, color_shape=color_shape + ) return region.get_child(index_partition) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 9e139f505..a2982d31d 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -1290,7 +1290,10 @@ def record_partition( color_shape: Optional[Shape] = None, ) -> None: self._partition_manager.record_partition( - index_space, functor, index_partition, color_shape=color_shape, + index_space, + functor, + index_partition, + color_shape=color_shape, ) def extract_scalar(self, future: Future, idx: int) -> Future: diff --git a/legate/core/store.py b/legate/core/store.py index 456205d69..9fb660d2a 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -1346,7 +1346,7 @@ def find_or_create_legion_partition( color_shape=partition.color_shape, color_transform=self._transform.get_inverse_color_transform( partition.color_shape.ndim, - ) + ), ) else: return self._storage.find_or_create_legion_partition( diff --git a/legate/core/transform.py b/legate/core/transform.py index d277bde15..e730901f4 100644 --- a/legate/core/transform.py +++ b/legate/core/transform.py @@ -18,8 +18,7 @@ import numpy as np -from . import AffineTransform -from . import Transform as LegionTransform +from . import AffineTransform, Transform as LegionTransform from .partition import Replicate, Restriction, Tiling from .projection import ProjExpr from .runtime import runtime From 4bbba90a5e7aa5547b9aee3a76601d92eeed4561 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 9 Aug 2022 16:01:05 -0700 Subject: [PATCH 19/35] src/core/mapper: remove map_copy 1-gpu heuristic Remove the heuristic that maps explicit copies in GPU memory to get sidestep GPU indirect copy performance problem. --- src/core/mapping/base_mapper.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/core/mapping/base_mapper.cc b/src/core/mapping/base_mapper.cc index 976ec885a..c34044e0e 100644 --- a/src/core/mapping/base_mapper.cc +++ b/src/core/mapping/base_mapper.cc @@ -1359,11 +1359,6 @@ void BaseMapper::map_copy(const MapperContext ctx, } } else { */ - { - // If we have just one local GPU then let's use it, otherwise punt to CPU - // since it's not clear which one we should use - if (local_frame_buffers.size() == 1) target_memory = local_frame_buffers.begin()->second; - } auto map_stores = [&](auto idx, auto& req, auto& inputs, auto& outputs) { auto& region = req.region; From e07daa620a4df9265419b4a4d5457cafaf2f9fed Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 9 Aug 2022 17:00:53 -0700 Subject: [PATCH 20/35] core: thread disjointness and completeness of partitions through image --- legate/core/constraints.py | 10 ++++++++++ legate/core/operation.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index 5c221eeb3..9590041af 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -226,6 +226,8 @@ def __init__( mapper: int, range: bool = False, functor: Any = ImagePartition, + disjoint: bool = True, + complete: bool = True, ): self._source_store = source_store self._dst_store = dst_store @@ -233,6 +235,8 @@ def __init__( self._mapper = mapper self._range = range self._functor = functor + self._disjoint = disjoint + self._complete = complete def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: return Image( @@ -242,6 +246,8 @@ def subst(self, mapping: dict[PartSym, PartitionBase]) -> Expr: self._mapper, range=self._range, functor=self._functor, + disjoint=self._disjoint, + complete=self._complete, ) def reduce(self) -> Lit: @@ -257,6 +263,8 @@ def reduce(self) -> Lit: part, self._mapper, range=self._range, + disjoint=self._disjoint, + complete=self._complete, ) ) @@ -274,6 +282,8 @@ def equals(self, other: object): and self._range == other._range and self._mapper == other._mapper and self._functor == other._functor + and self._disjoint == other._disjoint + and self._complete == other._complete ) diff --git a/legate/core/operation.py b/legate/core/operation.py index 1daee006c..727a4d09f 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -174,6 +174,8 @@ def add_image_constraint( store2: Store, range: bool = False, functor: Any = ImagePartition, + disjoint: bool = True, + complete: bool = True, ): self._check_store(store1) self._check_store(store2) @@ -189,6 +191,8 @@ def add_image_constraint( self._context.mapper_id, range=range, functor=functor, + disjoint=disjoint, + complete=complete, ) self.add_constraint(image <= part2) From 2c33ffa4c725f2706b1a34090ce06ed53102a515 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 12 Aug 2022 13:47:27 -0700 Subject: [PATCH 21/35] legate/core: issue fills for collected regions to encourage gc --- legate/core/runtime.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index a2982d31d..49a2a3d92 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -28,6 +28,7 @@ from . import ( Fence, FieldSpace, + Fill, Future, FutureMap, IndexSpace, @@ -296,6 +297,23 @@ def free_field( self, region: Region, field_id: int, ordered: bool = False ) -> None: if ordered: + # When freeing this field, also issue a fill to invalidate any valid + # instances attached to this region. This allows us to reuse that space + # without having to make an instance allocation of the same size and shape. + buf = ffi.new("char[]", self.dtype.size) + fut = Future.from_buffer( + self.runtime.legion_runtime, ffi.buffer(buf) + ) + fill = Fill( + region, + region, + field_id, + fut, + mapper=self.runtime.core_context.mapper_id, + ) + fill.launch( + self.runtime.legion_runtime, self.runtime.legion_context + ) if self.free_fields is not None: self.free_fields.append((region, field_id)) else: # Put this on the unordered list From dbe6960562a5208ea3c01aef15fc22836449fa21 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 25 Aug 2022 13:44:17 -0400 Subject: [PATCH 22/35] src/core/mapping: add sharding for fills issued by the core --- src/core/mapping/core_mapper.cc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/core/mapping/core_mapper.cc b/src/core/mapping/core_mapper.cc index 3c9a1dccb..ce2abcd2a 100644 --- a/src/core/mapping/core_mapper.cc +++ b/src/core/mapping/core_mapper.cc @@ -75,6 +75,10 @@ class CoreMapper : public Legion::Mapping::NullMapper { const Task& task, const SelectShardingFunctorInput& input, SelectShardingFunctorOutput& output); + virtual void select_sharding_functor(const MapperContext ctx, + const Fill& fill, + const SelectShardingFunctorInput& input, + SelectShardingFunctorOutput& output); virtual void select_steal_targets(const MapperContext ctx, const SelectStealingInput& input, SelectStealingOutput& output); @@ -344,6 +348,16 @@ void CoreMapper::select_sharding_functor(const MapperContext ctx, output.chosen_functor = context.get_sharding_id(LEGATE_CORE_TOPLEVEL_TASK_SHARD_ID); } +void CoreMapper::select_sharding_functor(const MapperContext ctx, + const Fill& fill, + const SelectShardingFunctorInput& input, + SelectShardingFunctorOutput& output) +{ + const int launch_dim = fill.index_domain.get_dim(); + assert(launch_dim == 1); + output.chosen_functor = context.get_sharding_id(LEGATE_CORE_TOPLEVEL_TASK_SHARD_ID); +} + void CoreMapper::select_steal_targets(const MapperContext ctx, const SelectStealingInput& input, SelectStealingOutput& output) From ebf9e021f9db33602a91833a373550cb61f2b9f7 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 25 Aug 2022 15:23:35 -0400 Subject: [PATCH 23/35] legate/core: add the ability to apply affine projections to domain parts --- legate/core/partition.py | 57 ++++++++++++++++++++++++++++++++++++++++ legate/core/solver.py | 10 ++++--- legate/core/transform.py | 52 ++++++++++++++++++++++++++++++++++-- 3 files changed, 113 insertions(+), 6 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 1537eb515..8745a96ed 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Optional, Sequence, Type, Union from . import ( + FutureMap, IndexPartition, PartitionByDomain, PartitionByImage, @@ -754,12 +755,16 @@ class DomainPartition(PartitionBase): def __init__( self, runtime: Runtime, + shape: Shape, color_shape: Shape, domains: Union[FutureMap, dict[Point, Rect]], ): self._runtime = runtime self._color_shape = color_shape self._domains = domains + self._shape = shape + if len(shape) == 0: + raise AssertionError @property def color_shape(self) -> Optional[Shape]: @@ -824,6 +829,7 @@ def __hash__(self) -> int: return hash( ( self.__class__, + self._shape, self._color_shape, # TODO (rohany): No better ideas... id(self._domains), @@ -839,3 +845,54 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) + + +# AffineProjection is translated from C++ to Python from the DISTAL +# AffineProjection functor. In particular, it encapsulates applying affine +# projections on `DomainPartition` objects. +class AffineProjection: + # Project each point to the following dimensions of the output point. + # Passing `None` as an entry in `projs` discards the chosen dimension + # from the projection. + def __init__(self, projs): + self.projs = projs + + @property + def dim(self): + return len(self.projs) + + def project_point(self, point: Point, output_bound: Point) -> Point: + output_dim = output_bound.dim + set_mask = [False] * output_dim + result = Point(dim=output_dim) + for i in range(0, self.dim): + mapTo = self.projs[i] + if mapTo is None: + continue + result[mapTo] = point[i] + set_mask[mapTo] = True + # Replace unset indices with their boundaries. + for i in range(0, output_dim): + if not set_mask[i]: + result[i] = output_bound[i] + return result + + def project_partition( + self, part: DomainPartition, bounds: Rect, tx_point: Any = None + ) -> DomainPartition: + # Don't handle FutureMaps right now. + assert not isinstance(part._domains, FutureMap) + projected = {} + for p, r in part._domains.items(): + lo = self.project_point(r.lo, bounds.lo) + hi = self.project_point(r.hi, bounds.hi) + if tx_point is not None: + p = tx_point(p) + projected[p] = Rect(lo=lo, hi=hi, exclusive=False) + new_shape = Shape( + tuple(bounds.hi[idx] + 1 for idx in range(bounds.dim)) + ) + color_shape = part.color_shape + if tx_point is not None: + color_shape = Shape(tx_point(color_shape, exclusive=True)) + return DomainPartition(part.runtime, new_shape, color_shape, projected) diff --git a/legate/core/solver.py b/legate/core/solver.py index 15e6b6d3c..4a0404fa7 100644 --- a/legate/core/solver.py +++ b/legate/core/solver.py @@ -421,8 +421,9 @@ def partition_stores(self) -> Strategy: "Partitions constrained by multiple " "constraints are not supported yet" ) - for unknown in c._rhs.unknowns(): - must_be_even.add(unknown) + if not isinstance(c._rhs, Image): + for unknown in c._rhs.unknowns(): + must_be_even.add(unknown) dependent[c._lhs] = c._rhs elif isinstance(c, Containment) and isinstance( c._rhs, PartSym @@ -437,8 +438,9 @@ def partition_stores(self) -> Strategy: "Partitions constrained by multiple " "constraints are not supported yet" ) - for unknown in c._lhs.unknowns(): - must_be_even.add(unknown) + if not isinstance(c._lhs, Image): + for unknown in c._lhs.unknowns(): + must_be_even.add(unknown) dependent[c._rhs] = c._lhs else: raise NotImplementedError diff --git a/legate/core/transform.py b/legate/core/transform.py index e730901f4..b92583d6c 100644 --- a/legate/core/transform.py +++ b/legate/core/transform.py @@ -18,8 +18,14 @@ import numpy as np -from . import AffineTransform, Transform as LegionTransform -from .partition import Replicate, Restriction, Tiling +from . import AffineTransform, Point, Rect, Transform as LegionTransform +from .partition import ( + AffineProjection, + DomainPartition, + Replicate, + Restriction, + Tiling, +) from .projection import ProjExpr from .runtime import runtime from .shape import Shape @@ -221,6 +227,23 @@ def invert(self, partition: PartitionBase) -> PartitionBase: partition.color_shape.drop(self._extra_dim), partition.offset.drop(self._extra_dim), ) + if isinstance(partition, DomainPartition): + # Project away the desired dimension. + all_axes = list(range(0, len(partition._shape))) + shape = partition._shape.drop(self._extra_dim) + axes = ( + all_axes[: self._extra_dim] + + [None] + + [x - 1 for x in all_axes[self._extra_dim + 1 :]] + ) + + def tx_point(p: Point, exclusive=False) -> Point: + return Point(Shape(p).drop(self._extra_dim)) + + result = AffineProjection(axes).project_partition( + partition, Rect(hi=shape), tx_point=tx_point + ) + return result else: raise ValueError( f"Unsupported partition: {type(partition).__name__}" @@ -253,6 +276,31 @@ def convert(self, partition: PartitionBase) -> PartitionBase: ) elif isinstance(partition, Replicate): return partition + elif isinstance(partition, DomainPartition): + # The idea here is to project all of the dimensions except + # the promoted one into a new affine projection. In the + # future, we could imagine caching these to avoid redundantly + # computing them. + all_axes = list(range(0, len(partition._shape))) + axes = all_axes[: self._extra_dim] + [ + x + 1 for x in all_axes[self._extra_dim :] + ] + shape = list(partition._shape.extents) + new_shape = Shape( + shape[: self._extra_dim] + + [self._dim_size] + + shape[self._extra_dim :] + ) + + def tx_point(p: Point, exclusive=False) -> Point: + return Point( + Shape(p).insert(self._extra_dim, 1 if exclusive else 0) + ) + + result = AffineProjection(axes).project_partition( + partition, Rect(hi=new_shape), tx_point=tx_point + ) + return result else: raise ValueError( f"Unsupported partition: {type(partition).__name__}" From 681478d1c8e2ca5605ac98fdde632afbc7e04752 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 31 Aug 2022 14:19:45 -0400 Subject: [PATCH 24/35] legate/core: stop hashing store version in ImagePartition This addresses some control replication violations --- legate/core/partition.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 8745a96ed..94aa92952 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -589,7 +589,14 @@ def __hash__(self) -> int: ( self.__class__, self._store, - self._store._version, + # Importantly, we _cannot_ store the version of the store + # in the hash value. This is because the store's version may + # change after we've already put this functor into a table. + # That would result in the hash value changing without moving + # the position in the table, breaking invariants of the table. + # However, we must still check for version in equality to avoid + # using old values. + # self._store._version, self._part, self._range, self._mapper, From 45efdf3f402c9319c4b0a54b56b5c3418c4cfa56 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 8 Sep 2022 13:29:55 -0700 Subject: [PATCH 25/35] legate/core/partition: support future-map backed domains in affine projections --- legate/core/partition.py | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 94aa92952..45b959894 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -30,6 +30,7 @@ Point, Rect, Transform, + ffi, legion, ) from .launcher import Broadcast, Partition @@ -887,15 +888,32 @@ def project_point(self, point: Point, output_bound: Point) -> Point: def project_partition( self, part: DomainPartition, bounds: Rect, tx_point: Any = None ) -> DomainPartition: - # Don't handle FutureMaps right now. - assert not isinstance(part._domains, FutureMap) projected = {} - for p, r in part._domains.items(): - lo = self.project_point(r.lo, bounds.lo) - hi = self.project_point(r.hi, bounds.hi) - if tx_point is not None: - p = tx_point(p) - projected[p] = Rect(lo=lo, hi=hi, exclusive=False) + if isinstance(part._domains, FutureMap): + for point in Rect(hi=part.color_shape): + fut = part._domains.get_future(point) + buf = fut.get_buffer() + dom = ffi.from_buffer("legion_domain_t*", buf)[0] + lg_rect = getattr( + legion, f"legion_domain_get_rect_{dom.dim}d" + )(dom) + lo = Point(dim=bounds.dim) + hi = Point(dim=bounds.dim) + for i in range(dom.dim): + lo[i] = lg_rect.lo.x[i] + hi[i] = lg_rect.hi.x[i] + lo = self.project_point(lo, bounds.lo) + hi = self.project_point(hi, bounds.hi) + if tx_point is not None: + point = tx_point(point) + projected[point] = Rect(lo=lo, hi=hi, exclusive=False) + else: + for p, r in part._domains.items(): + lo = self.project_point(r.lo, bounds.lo) + hi = self.project_point(r.hi, bounds.hi) + if tx_point is not None: + p = tx_point(p) + projected[p] = Rect(lo=lo, hi=hi, exclusive=False) new_shape = Shape( tuple(bounds.hi[idx] + 1 for idx in range(bounds.dim)) ) From 2a76d4ae29ae37f95bddb0687e7823d453025d68 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 14 Sep 2022 17:35:36 -0700 Subject: [PATCH 26/35] legate/core: rebasing on top of core reorganization --- legate/core/constraints.py | 1 - legate/core/partition.py | 81 +++++++++++++------------------------- legate/core/solver.py | 14 +++---- legate/core/store.py | 6 +-- 4 files changed, 35 insertions(+), 67 deletions(-) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index 9590041af..deabcee42 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -258,7 +258,6 @@ def reduce(self) -> Lit: return Lit(part) return Lit( self._functor( - part.runtime, self._source_store, part, self._mapper, diff --git a/legate/core/partition.py b/legate/core/partition.py index 45b959894..4560a92fb 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -39,7 +39,7 @@ from .shape import Shape if TYPE_CHECKING: - from . import FutureMap, Partition as LegionPartition, Region + from . import Partition as LegionPartition, Region RequirementType = Union[Type[Broadcast], Type[Partition]] @@ -86,19 +86,8 @@ def needs_delinearization(self, launch_ndim: int) -> bool: def requirement(self) -> RequirementType: ... - @abstractproperty - def runtime(self) -> Runtime: - ... - class Replicate(PartitionBase): - def __init__(self, runtime: Runtime): - self._runtime = runtime - - @property - def runtime(self): - return self._runtime - @property def color_shape(self) -> Optional[Shape]: return None @@ -156,6 +145,9 @@ def construct( return None +REPLICATE = Replicate() + + class Interval: def __init__(self, lo: int, extent: int) -> None: self._lo = lo @@ -365,7 +357,7 @@ def construct( kind=kind, keep=True, # export this partition functor to other libraries ) - self._runtime.record_partition( + runtime.record_partition( index_space, self, index_partition, color_shape=color_shape ) return region.get_child(index_partition) @@ -476,7 +468,6 @@ def construct( class ImagePartition(PartitionBase): def __init__( self, - runtime: Runtime, store: Any, part: PartitionBase, mapper: int, @@ -484,7 +475,6 @@ def __init__( disjoint: bool = True, complete: bool = True, ) -> None: - self._runtime = runtime self._mapper = mapper self._store = store self._part = part @@ -532,9 +522,7 @@ def construct( source_field.field_id, mapper=self._mapper, ) - index_partition = self._runtime.find_partition( - region.index_space, self - ) + index_partition = runtime.find_partition(region.index_space, self) if index_partition is None: if self._disjoint and self._complete: kind = legion.LEGION_DISJOINT_COMPLETE_KIND @@ -545,17 +533,15 @@ def construct( else: kind = legion.LEGION_ALIASED_INCOMPLETE_KIND index_partition = IndexPartition( - self._runtime.legion_context, - self._runtime.legion_runtime, + runtime.legion_context, + runtime.legion_runtime, region.index_space, source_part.color_space, functor=functor, kind=kind, keep=True, ) - self._runtime.record_partition( - region.index_space, self, index_partition - ) + runtime.record_partition(region.index_space, self, index_partition) return region.get_child(index_partition) def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: @@ -581,10 +567,6 @@ def needs_delinearization(self, launch_ndim: int) -> bool: def requirement(self) -> RequirementType: return Partition - @property - def runtime(self) -> Runtime: - return self._runtime - def __hash__(self) -> int: return hash( ( @@ -628,7 +610,6 @@ class PreimagePartition(PartitionBase): # For simplicities sake it seems like taking the store is fine. def __init__( self, - runtime: Runtime, source: Any, dest: Any, part: PartitionBase, @@ -637,7 +618,6 @@ def __init__( disjoint: bool = False, complete: bool = True, ) -> None: - self._runtime = runtime self._mapper = mapper self._source = source self._dest = dest @@ -676,9 +656,7 @@ def construct( source_field, mapper=self._mapper, ) - index_partition = self._runtime.find_partition( - region.index_space, self - ) + index_partition = runtime.find_partition(region.index_space, self) if index_partition is None: if self._disjoint and self._complete: kind = legion.LEGION_DISJOINT_COMPLETE_KIND @@ -689,17 +667,15 @@ def construct( else: kind = legion.LEGION_ALIASED_INCOMPLETE_KIND index_partition = IndexPartition( - self._runtime.legion_context, - self._runtime.legion_runtime, + runtime.legion_context, + runtime.legion_runtime, region.index_space, dest_part.color_space, functor=functor, kind=kind, keep=True, ) - self._runtime.record_partition( - region.index_space, self, index_partition - ) + runtime.record_partition(region.index_space, self, index_partition) return region.get_child(index_partition) def is_complete_for(self, extents: Shape, offsets: Shape) -> bool: @@ -723,16 +699,19 @@ def needs_delinearization(self, launch_ndim: int) -> bool: def requirement(self) -> RequirementType: return Partition - @property - def runtime(self) -> Runtime: - return self._runtime - def __hash__(self) -> int: return hash( ( self.__class__, self._source, - self._source._version, + # Importantly, we _cannot_ store the version of the store + # in the hash value. This is because the store's version may + # change after we've already put this functor into a table. + # That would result in the hash value changing without moving + # the position in the table, breaking invariants of the table. + # However, we must still check for version in equality to avoid + # using old values. + # self._store._version, self._dest.storage.region.index_space, self._part, self._range, @@ -762,12 +741,10 @@ def __repr__(self) -> str: class DomainPartition(PartitionBase): def __init__( self, - runtime: Runtime, shape: Shape, color_shape: Shape, domains: Union[FutureMap, dict[Point, Rect]], ): - self._runtime = runtime self._color_shape = color_shape self._domains = domains self._shape = shape @@ -790,18 +767,18 @@ def construct( color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: index_space = region.index_space - index_partition = self._runtime.find_partition(index_space, self) + index_partition = runtime.find_partition(index_space, self) if index_partition is None: functor = PartitionByDomain(self._domains) index_partition = IndexPartition( - self._runtime.legion_context, - self._runtime.legion_runtime, + runtime.legion_context, + runtime.legion_runtime, index_space, - self._runtime.find_or_create_index_space(self._color_shape), + runtime.find_or_create_index_space(self._color_shape), functor=functor, keep=True, ) - self._runtime.record_partition(index_space, self, index_partition) + runtime.record_partition(index_space, self, index_partition) return region.get_child(index_partition) # TODO (rohany): We could figure this out by staring at the domain map. @@ -826,10 +803,6 @@ def satisfies_restriction( def requirement(self) -> RequirementType: return Partition - @property - def runtime(self) -> Runtime: - return self._runtime - def needs_delinearization(self, launch_ndim: int) -> bool: return launch_ndim != self._color_shape.ndim @@ -920,4 +893,4 @@ def project_partition( color_shape = part.color_shape if tx_point is not None: color_shape = Shape(tx_point(color_shape, exclusive=True)) - return DomainPartition(part.runtime, new_shape, color_shape, projected) + return DomainPartition(new_shape, color_shape, projected) diff --git a/legate/core/solver.py b/legate/core/solver.py index 4a0404fa7..dbe509462 100644 --- a/legate/core/solver.py +++ b/legate/core/solver.py @@ -213,11 +213,11 @@ def _solve_constraints_for_futures( continue if store.kind is Future: - partitions[unknown] = Replicate(self._runtime) + partitions[unknown] = REPLICATE else: cls = constraints.find(unknown) for to_align in cls: - partitions[to_align] = Replicate(self._runtime) + partitions[to_align] = REPLICATE return unknowns.remove_all(to_remove) @@ -244,7 +244,7 @@ def _solve_unbound_constraints( fspace = runtime.create_field_space() for to_align in cls: - partitions[unknown] = Replicate(self._runtime) + partitions[unknown] = REPLICATE fspaces[unknown] = fspace unbound_ndims = set(unknown.store.ndim for unknown in to_remove) @@ -342,16 +342,12 @@ def compute_launch_shape( # to replication, in which case the operation must be performed # sequentially for unknown, part in partitions.items(): - if unknown.store in all_outputs and isinstance(part, Replicate): + if unknown.store in all_outputs and part == REPLICATE: return None # If we're here, this means that replicated stores are safe to access # in parallel, so we filter those out to determine the launch domain - parts = [ - part - for part in partitions.values() - if not isinstance(part, Replicate) - ] + parts = [part for part in partitions.values() if part != REPLICATE] # If all stores are replicated, we can't parallelize the operation if len(parts) == 0: diff --git a/legate/core/store.py b/legate/core/store.py index 9fb660d2a..e31463312 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -60,7 +60,7 @@ from .context import Context from .launcher import Proj from .projection import ProjFn - from .transform import TransformStackBase + from .transform import Transform, TransformStackBase from math import prod @@ -1267,7 +1267,7 @@ def compute_key_partition( # If this is effectively a scalar store, we don't need to partition it if self.kind is Future or self.ndim == 0: - return Replicate(self._runtime) + return REPLICATE # We need the transformations to be convertible so that we can map # the storage partition to this store's coordinate space @@ -1287,7 +1287,7 @@ def compute_key_partition( restrictions, ) if launch_shape is None: - partition = Replicate(self._runtime) + partition = REPLICATE else: tile_shape = partition_manager.compute_tile_shape( self.shape, launch_shape From d761d058db9cc1b9bb06e550bcfd29a2a03dd74e Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Mon, 26 Sep 2022 17:30:40 -0700 Subject: [PATCH 27/35] legate/core/store: stop using weighted partitions as keys when transformed --- legate/core/store.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/legate/core/store.py b/legate/core/store.py index 5bc087ad0..1a2a1d58c 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -33,7 +33,7 @@ DistributedAllocation, InlineMappedAllocation, ) -from .partition import REPLICATE, PartitionBase, Restriction, Tiling +from .partition import REPLICATE, PartitionBase, Restriction, Tiling, Weighted from .projection import execute_functor_symbolically from .runtime import runtime from .shape import Shape @@ -1282,7 +1282,9 @@ def compute_key_partition( else: partition = None - if partition is not None: + if partition is not None and ( + not (self.transformed and isinstance(partition, Weighted)) + ): partition = self._transform.convert_partition(partition) return partition else: From d1c0ef65605e6cc79203c69625262cb678d92daf Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Mon, 26 Sep 2022 17:36:02 -0700 Subject: [PATCH 28/35] legate/core/store: fix some fallout from partition refactoring --- legate/core/operation.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/legate/core/operation.py b/legate/core/operation.py index 727a4d09f..05eb079f7 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -21,7 +21,7 @@ from . import Future, FutureMap, Rect from .constraints import Image, PartSym from .launcher import CopyLauncher, TaskLauncher -from .partition import ImagePartition, Replicate, Weighted +from .partition import REPLICATE, ImagePartition, Weighted from .shape import Shape from .store import Store, StorePartition from .utils import OrderedSet, capture_traceback @@ -630,9 +630,7 @@ def add_input( ) -> None: self._check_arg(arg) if isinstance(arg, Store): - self._input_parts.append( - arg.partition(Replicate(self.context.runtime)) - ) + self._input_parts.append(arg.partition(REPLICATE)) else: self._input_parts.append(arg) self._input_projs.append(proj) @@ -650,9 +648,7 @@ def add_output( return if arg.kind is Future: self._scalar_outputs.append(len(self._outputs)) - self._output_parts.append( - arg.partition(Replicate(self.context.runtime)) - ) + self._output_parts.append(arg.partition(REPLICATE)) else: self._output_parts.append(arg) self._output_projs.append(proj) @@ -667,9 +663,7 @@ def add_reduction( if isinstance(arg, Store): if arg.kind is Future: self._scalar_reductions.append(len(self._reductions)) - self._reduction_parts.append( - (arg.partition(Replicate(self.context.runtime)), redop) - ) + self._reduction_parts.append((arg.partition(REPLICATE), redop)) else: self._reduction_parts.append((arg, redop)) self._reduction_projs.append(proj) From bec9227acdd76fe5d79fd53a06099bd0b93d3d14 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Mon, 26 Sep 2022 18:14:55 -0700 Subject: [PATCH 29/35] *: fix all mypy warnings --- legate/core/_legion/partition.py | 4 ++-- legate/core/constraints.py | 2 +- legate/core/operation.py | 6 ++--- legate/core/partition.py | 39 ++++++++++++++++++++------------ legate/core/runtime.py | 9 ++++---- legate/core/shape.py | 4 +++- legate/core/store.py | 19 ++++------------ legate/core/transform.py | 14 ++++++------ 8 files changed, 50 insertions(+), 47 deletions(-) diff --git a/legate/core/_legion/partition.py b/legate/core/_legion/partition.py index 0c4c3d026..70aff9c67 100644 --- a/legate/core/_legion/partition.py +++ b/legate/core/_legion/partition.py @@ -271,12 +271,12 @@ def get_root(self) -> IndexSpace: @property def disjoint(self) -> bool: - return legion.legion_index_partition_is_disjoint( + return legion.legion_index_partition_is_disjoint( # type: ignore self.runtime, self.handle ) @property def complete(self) -> bool: - return legion.legion_index_partition_is_complete( + return legion.legion_index_partition_is_complete( # type: ignore self.runtime, self.handle ) diff --git a/legate/core/constraints.py b/legate/core/constraints.py index deabcee42..072b6ad6e 100644 --- a/legate/core/constraints.py +++ b/legate/core/constraints.py @@ -271,7 +271,7 @@ def unknowns(self) -> Iterator[PartSym]: for unknown in self._src_part_sym.unknowns(): yield unknown - def equals(self, other: object): + def equals(self, other: object) -> bool: return ( isinstance(other, Image) and self._source_store == other._source_store diff --git a/legate/core/operation.py b/legate/core/operation.py index 05eb079f7..943c29981 100644 --- a/legate/core/operation.py +++ b/legate/core/operation.py @@ -142,7 +142,7 @@ def get_all_stores(self) -> OrderedSet[Store]: return result def get_all_modified_stores(self) -> OrderedSet[Store]: - result = OrderedSet() + result: OrderedSet[Store] = OrderedSet() result.update(self._outputs) result.update(store for (store, _) in self._reductions) return result @@ -176,7 +176,7 @@ def add_image_constraint( functor: Any = ImagePartition, disjoint: bool = True, complete: bool = True, - ): + ) -> None: self._check_store(store1) self._check_store(store2) # TODO (rohany): We only support point (and rect types if range) here. @@ -626,7 +626,7 @@ def _check_arg(arg: Union[Store, StorePartition]) -> None: def add_input( self, arg: Union[Store, StorePartition], - proj: Optional[ProjFn, int] = None, + proj: Optional[Union[ProjFn, int]] = None, ) -> None: self._check_arg(arg) if isinstance(arg, Store): diff --git a/legate/core/partition.py b/legate/core/partition.py index 4560a92fb..326c429be 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -285,7 +285,7 @@ def translate_range(self, offset: Shape) -> Union[Replicate, Tiling]: # TODO: We can actually bloat the tile so that all stencils within # the range are contained, but here we simply replicate # the region, as this usually happens for small inputs. - return Replicate(self.runtime) + return REPLICATE else: return Tiling( self._tile_shape, @@ -340,13 +340,13 @@ def construct( kind = ( legion.LEGION_DISJOINT_COMPLETE_KIND if color_shape is None - else legion.LEGION_ALIASED_COMPLETE_KIND + else legion.LEGION_ALIASED_COMPLETE_KIND # type: ignore ) else: kind = ( legion.LEGION_DISJOINT_INCOMPLETE_KIND if color_shape is None - else legion.LEGION_ALIASED_INCOMPLETE_KIND + else legion.LEGION_ALIASED_INCOMPLETE_KIND # type: ignore ) index_partition = IndexPartition( runtime.legion_context, @@ -516,7 +516,7 @@ def construct( mapper=self._mapper, ) else: - functor = PartitionByImage( + functor = PartitionByImage( # type: ignore source_region, source_part, source_field.field_id, @@ -529,9 +529,9 @@ def construct( elif self._disjoint and not self._complete: kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND elif not self._disjoint and self._complete: - kind = legion.LEGION_ALIASED_COMPLETE_KIND + kind = legion.LEGION_ALIASED_COMPLETE_KIND # type: ignore else: - kind = legion.LEGION_ALIASED_INCOMPLETE_KIND + kind = legion.LEGION_ALIASED_INCOMPLETE_KIND # type: ignore index_partition = IndexPartition( runtime.legion_context, runtime.legion_runtime, @@ -561,6 +561,7 @@ def satisfies_restriction( return True def needs_delinearization(self, launch_ndim: int) -> bool: + assert self.color_shape is not None return launch_ndim != self.color_shape.ndim @property @@ -650,7 +651,7 @@ def construct( PartitionByPreimageRange if self._range else PartitionByPreimage ) functor = functorFn( - dest_part.index_partition, + dest_part.index_partition, # type: ignore source_region, source_region, source_field, @@ -663,9 +664,11 @@ def construct( elif self._disjoint and not self._complete: kind = legion.LEGION_DISJOINT_INCOMPLETE_KIND elif not self._disjoint and self._complete: - kind = legion.LEGION_ALIASED_COMPLETE_KIND + kind = legion.LEGION_ALIASED_COMPLETE_KIND # type: ignore else: - kind = legion.LEGION_ALIASED_INCOMPLETE_KIND + kind = legion.LEGION_ALIASED_INCOMPLETE_KIND # type: ignore + # Discharge some typing errors. + assert dest_part is not None index_partition = IndexPartition( runtime.legion_context, runtime.legion_runtime, @@ -693,6 +696,7 @@ def satisfies_restriction( return True def needs_delinearization(self, launch_ndim: int) -> bool: + assert self.color_shape is not None return launch_ndim != self.color_shape.ndim @property @@ -732,7 +736,7 @@ def __eq__(self, other: object) -> bool: ) def __str__(self) -> str: - return f"preimage({self._store}, {self._part}, range={self._range})" + return f"preimage({self._source}, {self._part}, range={self._range})" def __repr__(self) -> str: return str(self) @@ -835,11 +839,11 @@ class AffineProjection: # Project each point to the following dimensions of the output point. # Passing `None` as an entry in `projs` discards the chosen dimension # from the projection. - def __init__(self, projs): + def __init__(self, projs: list[Optional[int]]): self.projs = projs @property - def dim(self): + def dim(self) -> int: return len(self.projs) def project_point(self, point: Point, output_bound: Point) -> Point: @@ -866,7 +870,7 @@ def project_partition( for point in Rect(hi=part.color_shape): fut = part._domains.get_future(point) buf = fut.get_buffer() - dom = ffi.from_buffer("legion_domain_t*", buf)[0] + dom = ffi.from_buffer("legion_domain_t*", buf)[0] # type: ignore # noqa lg_rect = getattr( legion, f"legion_domain_get_rect_{dom.dim}d" )(dom) @@ -879,18 +883,23 @@ def project_partition( hi = self.project_point(hi, bounds.hi) if tx_point is not None: point = tx_point(point) - projected[point] = Rect(lo=lo, hi=hi, exclusive=False) + projected[point] = Rect( + lo=tuple(lo), hi=tuple(hi), exclusive=False + ) else: for p, r in part._domains.items(): lo = self.project_point(r.lo, bounds.lo) hi = self.project_point(r.hi, bounds.hi) if tx_point is not None: p = tx_point(p) - projected[p] = Rect(lo=lo, hi=hi, exclusive=False) + projected[p] = Rect( + lo=tuple(lo), hi=tuple(hi), exclusive=False + ) new_shape = Shape( tuple(bounds.hi[idx] + 1 for idx in range(bounds.dim)) ) color_shape = part.color_shape if tx_point is not None: color_shape = Shape(tx_point(color_shape, exclusive=True)) + assert color_shape is not None return DomainPartition(new_shape, color_shape, projected) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 49a2a3d92..243e9068c 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -297,9 +297,10 @@ def free_field( self, region: Region, field_id: int, ordered: bool = False ) -> None: if ordered: - # When freeing this field, also issue a fill to invalidate any valid - # instances attached to this region. This allows us to reuse that space - # without having to make an instance allocation of the same size and shape. + # When freeing this field, also issue a fill to invalidate any + # valid instances attached to this region. This allows us to reuse + # that space without having to make an instance allocation of the + # same size and shape. buf = ffi.new("char[]", self.dtype.size) fut = Future.from_buffer( self.runtime.legion_runtime, ffi.buffer(buf) @@ -545,7 +546,7 @@ def __init__(self, runtime: Runtime) -> None: ) self._piece_factors = list(reversed(factors)) self._index_partitions: dict[ - tuple[IndexSpace, PartitionBase], IndexPartition + tuple[IndexSpace, PartitionBase, Optional[Shape]], IndexPartition ] = {} def compute_launch_shape( diff --git a/legate/core/shape.py b/legate/core/shape.py index 1bb5ba98a..69a634d17 100644 --- a/legate/core/shape.py +++ b/legate/core/shape.py @@ -32,7 +32,9 @@ def _cast_tuple(value: ExtentLike, ndim: int) -> tuple[int, ...]: return value.extents elif isinstance(value, Iterable): return tuple(value) - elif isinstance(value, int) or np.issubdtype(type(value), np.integer): + elif isinstance(value, int) or np.issubdtype( + type(value), np.integer + ): # type: ignore return (value,) * ndim else: raise ValueError(f"Cannot cast {type(value).__name__} to tuple") diff --git a/legate/core/store.py b/legate/core/store.py index 1a2a1d58c..ed5164c21 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -771,7 +771,7 @@ def find_or_create_legion_partition( self.data.region, complete=complete, color_shape=color_shape, - color_transform=color_transform, + color_transform=color_transform, # type: ignore ) if color_shape is None: self._partitions[functor] = part @@ -820,7 +820,7 @@ def get_child_store(self, *indices: int) -> Store: def get_requirement( self, launch_ndim: int, - proj_fn: Optional[ProjFn, int] = None, + proj_fn: Optional[Union[ProjFn, int]] = None, ) -> Proj: part = self._storage_partition.find_or_create_legion_partition() if part is not None: @@ -992,7 +992,7 @@ def transformed(self) -> bool: def version(self) -> int: return self._version - def bump_version(self): + def bump_version(self) -> None: self._version += 1 def attach_external_allocation( @@ -1350,8 +1350,8 @@ def find_or_create_legion_partition( self._transform.invert_partition(partition), complete=complete, color_shape=partition.color_shape, - color_transform=self._transform.get_inverse_color_transform( - partition.color_shape.ndim, + color_transform=self._transform.get_inverse_color_transform( # type: ignore # noqa + partition.color_shape.ndim, # type: ignore ), ) else: @@ -1366,15 +1366,6 @@ def partition(self, partition: PartitionBase) -> StorePartition: ) return StorePartition(self, partition, storage_partition) - # TODO (rohany): Hacking... - def direct_partition(self, partition: PartitionBase) -> StorePartition: - return StorePartition( - self._runtime, - self, - partition, - self._storage.partition(partition), - ) - def partition_by_tiling( self, tile_shape: Union[Shape, Sequence[int]] ) -> StorePartition: diff --git a/legate/core/transform.py b/legate/core/transform.py index b92583d6c..49af7b871 100644 --- a/legate/core/transform.py +++ b/legate/core/transform.py @@ -14,7 +14,7 @@ # from __future__ import annotations -from typing import TYPE_CHECKING, Protocol, Tuple +from typing import TYPE_CHECKING, Optional, Protocol, Tuple import numpy as np @@ -231,13 +231,13 @@ def invert(self, partition: PartitionBase) -> PartitionBase: # Project away the desired dimension. all_axes = list(range(0, len(partition._shape))) shape = partition._shape.drop(self._extra_dim) - axes = ( - all_axes[: self._extra_dim] - + [None] + axes: list[Optional[int]] = ( + all_axes[: self._extra_dim] # type: ignore + + [None] # type: ignore + [x - 1 for x in all_axes[self._extra_dim + 1 :]] ) - def tx_point(p: Point, exclusive=False) -> Point: + def tx_point(p: Point, exclusive: bool = False) -> Point: return Point(Shape(p).drop(self._extra_dim)) result = AffineProjection(axes).project_partition( @@ -292,12 +292,12 @@ def convert(self, partition: PartitionBase) -> PartitionBase: + shape[self._extra_dim :] ) - def tx_point(p: Point, exclusive=False) -> Point: + def tx_point(p: Point, exclusive: bool = False) -> Point: return Point( Shape(p).insert(self._extra_dim, 1 if exclusive else 0) ) - result = AffineProjection(axes).project_partition( + result = AffineProjection(axes).project_partition( # type: ignore partition, Rect(hi=new_shape), tx_point=tx_point ) return result From 10a015ce98a625464cc12bdb5d054ddebb24bc2e Mon Sep 17 00:00:00 2001 From: Wonchan Lee Date: Fri, 30 Sep 2022 11:33:50 -0700 Subject: [PATCH 30/35] Adjust consensus match frequency based on field sizes (#402) * Perform consensus match more frequently for bigger free fields * Minor cleanup --- legate/core/runtime.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index fa4fdaad9..b47624378 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -200,9 +200,9 @@ def add_free_field( ) -> None: self._freed_fields.append(FreeFieldInfo(manager, region, field_id)) - def issue_field_match(self) -> None: + def issue_field_match(self, credit: int) -> None: # Increment our match counter - self._match_counter += 1 + self._match_counter += credit if self._match_counter < self._match_frequency: return # If the match counter equals our match frequency then do an exchange @@ -342,9 +342,29 @@ def __init__( ) -> None: super().__init__(runtime, shape, field_size) self._field_match_manager = runtime.field_match_manager + self._update_match_credit() + + def _update_match_credit(self) -> None: + if self.shape.fixed: + size = self.shape.volume() * self.field_size + self._match_credit = ( + size + self.runtime.max_field_reuse_size - 1 + if size > self.runtime.max_field_reuse_size + else self.runtime.max_field_reuse_size + ) // self.runtime.max_field_reuse_size + # No need to update the credit as the exact size is known + self._need_to_update_match_credit = False + # If the shape is unknown, we set the credit such that every new + # free field leads to a consensus match, and ask the manager + # to update the credit. + else: + self._match_credit = self.runtime.max_field_reuse_frequency + self._need_to_update_match_credit = True def try_reuse_field(self) -> Optional[tuple[Region, int]]: - self._field_match_manager.issue_field_match() + if self._need_to_update_match_credit: + self._update_match_credit() + self._field_match_manager.issue_field_match(self._match_credit) # First, if we have a free field then we know everyone has one of those if len(self.free_fields) > 0: @@ -915,6 +935,12 @@ def __init__(self, core_library: CoreLib) -> None: ty.uint32, ) ) + self.max_field_reuse_size = int( + self._core_context.get_tunable( + legion.LEGATE_CORE_TUNABLE_FIELD_REUSE_SIZE, + ty.uint64, + ) + ) self._field_manager_class = ( ConsensusMatchingFieldManager if self._num_nodes > 1 or self._args.consensus @@ -1246,12 +1272,7 @@ def find_region_manager(self, region: Region) -> RegionManager: return self.region_managers_by_region[region] def revive_manager(self, region_mgr: RegionManager) -> None: - lru_managers: Deque[RegionManager] = deque() - for to_check in self.lru_managers: - if to_check is not region_mgr: - lru_managers.append(to_check) - assert len(lru_managers) < len(self.lru_managers) - self.lru_managers = lru_managers + self.lru_managers.remove(region_mgr) def free_region_manager( self, shape: Shape, region: Region, unordered: bool = False From 18afbb3566878081f26806f7963c2b80ea8c83f7 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Tue, 11 Oct 2022 11:21:27 -0700 Subject: [PATCH 31/35] legate/core/partition: use storages for equality rather than stores --- legate/core/partition.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 326c429be..f73b548e4 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -572,7 +572,7 @@ def __hash__(self) -> int: return hash( ( self.__class__, - self._store, + self._store._storage, # Importantly, we _cannot_ store the version of the store # in the hash value. This is because the store's version may # change after we've already put this functor into a table. @@ -590,8 +590,12 @@ def __hash__(self) -> int: def __eq__(self, other: object) -> bool: return ( isinstance(other, ImagePartition) - # TODO (rohany): I think we can perform equality on the store. - and self._store == other._store + # Importantly, we check equality of Storage objects rather than + # Stores. This is because Stores can have equivalent storages but + # not be equal due to transformations on the store. By checking + # that the Storages are equal, we are basically checking whether + # we have the same RegionField object. + and self._store._storage == other._store._storage and self._store.version == other._store.version and self._part == other._part and self._range == other._range @@ -707,7 +711,7 @@ def __hash__(self) -> int: return hash( ( self.__class__, - self._source, + self._source._storage, # Importantly, we _cannot_ store the version of the store # in the hash value. This is because the store's version may # change after we've already put this functor into a table. @@ -726,8 +730,9 @@ def __hash__(self) -> int: def __eq__(self, other: object) -> bool: return ( isinstance(other, PreimagePartition) - # TODO (rohany): I think we can perform equality on the store. - and self._source == other._source + # See the comment on ImagePartition.__eq__ about why we use + # source._storage for equality. + and self._source._storage == other._source._storage and self._source._version == other._source._version and self._dest == other._dest and self._part == other._part From 60d9f201483024a2dbdac826b6f18e62620aed9c Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Sun, 11 Dec 2022 15:22:44 -0800 Subject: [PATCH 32/35] legate/core/partition: fix bug in equality checking of preimages Signed-off-by: Rohan Yadav --- legate/core/partition.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index 2a143de07..cfb04391b 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -657,7 +657,6 @@ def construct( color_shape: Optional[Shape] = None, color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: - # TODO (rohany): What should the value of complete be? dest_part = self._part.construct(self._dest.storage.region) source_region = self._source.storage.region source_field = self._source.storage.field.field_id @@ -748,7 +747,8 @@ def __eq__(self, other: object) -> bool: # source._storage for equality. and self._source._storage == other._source._storage and self._source._version == other._source._version - and self._dest == other._dest + and self._dest.storage.region.index_space + == other._dest.storage.region.index_space and self._part == other._part and self._range == other._range and self._mapper == other._mapper From e5321f8397f0c7bcf56b891f7aff481fc2f98c29 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Sun, 11 Dec 2022 20:47:26 -0800 Subject: [PATCH 33/35] legate/core/partition: stop leaking stores when using PreImage Signed-off-by: Rohan Yadav --- legate/core/partition.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/legate/core/partition.py b/legate/core/partition.py index cfb04391b..ec7438fd7 100644 --- a/legate/core/partition.py +++ b/legate/core/partition.py @@ -635,7 +635,13 @@ def __init__( ) -> None: self._mapper = mapper self._source = source - self._dest = dest + # Importantly, we don't store a reference to `dest` and instead + # hold onto a handle of the underlying region. This is important + # because if we store dest itself on the partition then legate + # can't collect and reuse the storage under dest. Since all we + # actually need from dest is the underlying index space, storing + # the region sidesteps this limitation. + self._dest_region = dest.storage.region self._part = part # Whether this is an image or image_range operation. self._range = range @@ -657,7 +663,7 @@ def construct( color_shape: Optional[Shape] = None, color_transform: Optional[Transform] = None, ) -> Optional[LegionPartition]: - dest_part = self._part.construct(self._dest.storage.region) + dest_part = self._part.construct(self._dest_region) source_region = self._source.storage.region source_field = self._source.storage.field.field_id functorFn = ( @@ -733,7 +739,7 @@ def __hash__(self) -> int: # However, we must still check for version in equality to avoid # using old values. # self._store._version, - self._dest.storage.region.index_space, + self._dest_region.index_space, self._part, self._range, self._mapper, @@ -747,8 +753,7 @@ def __eq__(self, other: object) -> bool: # source._storage for equality. and self._source._storage == other._source._storage and self._source._version == other._source._version - and self._dest.storage.region.index_space - == other._dest.storage.region.index_space + and self._dest_region.index_space == other._dest_region.index_space and self._part == other._part and self._range == other._range and self._mapper == other._mapper From ae9a99774cbdedfd6c3f56ac0da84156c640ca6e Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 17 Feb 2023 10:05:59 -0800 Subject: [PATCH 34/35] install.py: switch the default branch back to control_replication --- install.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install.py b/install.py index e3dfdef1c..fdff8e692 100755 --- a/install.py +++ b/install.py @@ -724,7 +724,7 @@ def driver(): "--legion-branch", dest="legion_branch", required=False, - default="collective", + default="control_replication", help="Legion branch to build Legate with.", ) args, unknown = parser.parse_known_args() From 5279dc6414daeae16beb78eeb34424b1146a1cea Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 17 Feb 2023 11:34:22 -0800 Subject: [PATCH 35/35] cmake/thirdparty: one more change to switch back to control_replication --- cmake/thirdparty/get_legion.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/thirdparty/get_legion.cmake b/cmake/thirdparty/get_legion.cmake index 92ec30247..5faf54023 100644 --- a/cmake/thirdparty/get_legion.cmake +++ b/cmake/thirdparty/get_legion.cmake @@ -175,7 +175,7 @@ function(find_or_configure_legion) endfunction() if(NOT DEFINED legate_core_LEGION_BRANCH) - set(legate_core_LEGION_BRANCH collective) + set(legate_core_LEGION_BRANCH control_replication) endif() if(NOT DEFINED legate_core_LEGION_REPOSITORY)