From 046a82ba230f4afd6bac83ad93b487aac9e00390 Mon Sep 17 00:00:00 2001 From: Manolis Papadakis Date: Tue, 17 Oct 2023 15:33:48 -0700 Subject: [PATCH] Must defer unordered detaches even in single-node mode Otherwise we end up doing runtime work in a detructor, which causes CFII to deadlock. --- legate/core/_legion/operation.py | 8 +++ legate/core/runtime.py | 109 +++++++++++++++++++------------ legate/core/store.py | 12 ++-- 3 files changed, 79 insertions(+), 50 deletions(-) diff --git a/legate/core/_legion/operation.py b/legate/core/_legion/operation.py index 94220df15..3f6a282d8 100644 --- a/legate/core/_legion/operation.py +++ b/legate/core/_legion/operation.py @@ -1233,6 +1233,10 @@ def __init__(self, region: PhysicalRegion, flush: bool = True) -> None: # it is not deleted before this detach operation can run self.region = region.region self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None @dispatch def launch( @@ -1401,6 +1405,10 @@ def __init__( """ self.external_resources = external_resources self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None def launch( self, diff --git a/legate/core/runtime.py b/legate/core/runtime.py index b7060df37..b4c102311 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -144,14 +144,12 @@ class FreeFieldInfo: manager: FieldManager region: Region field_id: int - attached_alloc: Union[None, Attachable] detach: Union[Detach, IndexDetach, None] - def free(self, ordered: bool = False) -> None: + def free(self, ordered: bool) -> None: self.manager.free_field( self.region, self.field_id, - self.attached_alloc, self.detach, ordered=ordered, ) @@ -258,11 +256,10 @@ def add_free_field( manager: FieldManager, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ) -> None: self._freed_fields.append( - FreeFieldInfo(manager, region, field_id, attached_alloc, detach) + FreeFieldInfo(manager, region, field_id, detach) ) def issue_field_match(self, credit: int) -> None: @@ -362,14 +359,24 @@ def allocate_field(self, field_size: Any) -> tuple[Region, int, bool]: def _try_reuse_field( - free_fields: Deque[tuple[Region, int, Union[Future, None]]] + free_fields: Deque[tuple[Region, int, Union[Detach, IndexDetach, None]]] ) -> Optional[tuple[Region, int]]: if len(free_fields) == 0: return None - field_info = free_fields.popleft() - if field_info[2] is not None and not field_info[2].is_ready(): - field_info[2].wait() - return field_info[0], field_info[1] + region, field_id, detach = free_fields.popleft() + + if detach is not None: + if detach.future is None: + # corner case; the detach has been added to _deferred_detachments + # but not dispatched yet, so do that now + runtime.attachment_manager.perform_detachments() + assert detach.future is not None + # We have to wait for the field to be detached from any attachment + # before we can reuse it. + detach.future.wait() + # The Detach operation will asynchronously be removed from + # _pending_detachments through the _prune_detachment mechanism + return region, field_id # This class manages the allocation and reuse of fields @@ -385,7 +392,7 @@ def __init__( # guaranteed to be ordered across all the shards even with # control replication self.free_fields: Deque[ - tuple[Region, int, Union[Future, None]] + tuple[Region, int, Union[Detach, IndexDetach, None]] ] = deque() def destroy(self) -> None: @@ -412,17 +419,14 @@ def free_field( self, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: - detach_fut: Union[Future, None] = None - if attached_alloc is not None: - assert detach is not None - detach_fut = runtime.attachment_manager.detach_external_allocation( - attached_alloc, detach + if detach is not None: + runtime.attachment_manager.detach_external_allocation( + detach, ordered ) - self.free_fields.append((region, field_id, detach_fut)) + self.free_fields.append((region, field_id, detach)) region_manager = self.runtime.find_region_manager(region) if region_manager.decrease_active_field_count(): self.runtime.free_region_manager( @@ -475,17 +479,14 @@ def free_field( self, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: if ordered: - super().free_field( - region, field_id, attached_alloc, detach, ordered - ) + super().free_field(region, field_id, detach, ordered) else: # Put this on the unordered list self._field_match_manager.add_free_field( - self, region, field_id, attached_alloc, detach + self, region, field_id, detach ) @@ -523,15 +524,19 @@ def __init__(self, runtime: Runtime) -> None: self._registered_detachments: dict[ int, Union[Detach, IndexDetach] ] = dict() - self._pending_detachments: dict[Future, Attachable] = dict() + self._deferred_detachments: List[Union[Detach, IndexDetach]] = [] + self._pending_detachments: List[Union[Detach, IndexDetach]] = [] self._destroyed = False def destroy(self) -> None: self._destroyed = True + # Schedule any queued detachments + self.perform_detachments() # Always make sure we wait for any pending detachments to be done # so that we don't lose the references and make the GC unhappy - for future in self._pending_detachments.keys(): - future.wait() + for detach in self._pending_detachments: + assert detach.future is not None + detach.future.wait() self._pending_detachments.clear() # Clean up our attachments so that they can be collected self._attachments = dict() @@ -614,16 +619,24 @@ def _remove_allocation(self, alloc: Attachable) -> None: def detach_external_allocation( self, - attached_alloc: Attachable, detach: Union[Detach, IndexDetach], - ) -> Union[None, Future]: - self._remove_allocation(attached_alloc) + ordered: bool, + previously_deferred: bool = False, + ) -> None: + assert detach.attached_alloc is not None + assert detach.future is None + if not previously_deferred: + self._remove_allocation(detach.attached_alloc) + if not ordered: + # If we need to defer this until later do that now + self._deferred_detachments.append(detach) + return future = self._runtime.dispatch(detach) + # Hang the future on the detach operation itself + detach.future = future # If the future is already ready, then no need to track it - if future.is_ready(): - return None - self._pending_detachments[future] = attached_alloc - return future + if not future.is_ready(): + self._pending_detachments.append(detach) def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int: key = self._next_detachment_key @@ -636,13 +649,24 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]: del self._registered_detachments[detach_key] return detach + def perform_detachments(self) -> None: + # We have to clear the list first, otherwise the dispatch() of the + # detach brings us back here, and results in an infinite loop. + detachments = self._deferred_detachments + self._deferred_detachments = list() + for detach in detachments: + if detach.future is None: + self.detach_external_allocation( + detach, ordered=True, previously_deferred=True + ) + def prune_detachments(self) -> None: - to_remove = [] - for future in self._pending_detachments.keys(): - if future.is_ready(): - to_remove.append(future) - for future in to_remove: - del self._pending_detachments[future] + new_pending: List[Union[Detach, IndexDetach]] = [] + for detach in self._pending_detachments: + assert detach.future is not None + if not detach.future.is_ready(): + new_pending.append(detach) + self._pending_detachments = new_pending class PartitionManager: @@ -1339,10 +1363,12 @@ def get_next_storage_id(self) -> int: return self._next_storage_id def dispatch(self, op: Dispatchable[T]) -> T: + self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) def dispatch_single(self, op: Dispatchable[T]) -> T: + self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) @@ -1828,7 +1854,6 @@ def free_field( field_id: int, field_size: int, shape: Shape, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ) -> None: # Have a guard here to make sure that we don't try to @@ -1840,9 +1865,7 @@ def free_field( if key not in self.field_managers: return - self.field_managers[key].free_field( - region, field_id, attached_alloc, detach - ) + self.field_managers[key].free_field(region, field_id, detach) def import_output_region( self, out_region: OutputRegion, field_id: int, dtype: Any diff --git a/legate/core/store.py b/legate/core/store.py index e901737fe..10279256c 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -80,8 +80,6 @@ def __init__( self.field_id = field_id self.field_size = field_size self.shape = shape - # External allocation we attached to this field - self.attached_alloc: Union[None, Attachable] = None self.detach_key: int = -1 def same_handle(self, other: Field) -> bool: @@ -96,7 +94,7 @@ def __del__(self) -> None: # Detach any existing allocation detach = ( None - if self.attached_alloc is None + if self.detach_key < 0 else attachment_manager.remove_detachment(self.detach_key) ) # Return our field back to the runtime @@ -105,7 +103,6 @@ def __del__(self) -> None: self.field_id, self.field_size, self.shape, - self.attached_alloc, detach, ) @@ -158,7 +155,7 @@ def attach_external_allocation( ) -> None: assert self.parent is None # If we already have some memory attached, detach it first - if self.field.attached_alloc is not None: + if self.field.detach_key >= 0: raise RuntimeError("A RegionField cannot be re-attached") # All inline mappings should have been unmapped by now assert self.physical_region_refs == 0 @@ -167,6 +164,9 @@ def attach_external_allocation( attachment_manager.attach_external_allocation(alloc, self) def record_detach(detach: Union[Detach, IndexDetach]) -> None: + # Hang the allocation on the detach operation, so it won't be + # deallocated until the detach is processed. + detach.attached_alloc = alloc # Don't store the detachment operation here, instead register it # on the attachment manager and record its unique key # TODO: This might not be necessary anymore @@ -240,8 +240,6 @@ def record_detach(detach: Union[Detach, IndexDetach]) -> None: # We don't need to flush the contents back to the attached memory # if this is an internal temporary allocation. record_detach(IndexDetach(external_resources, flush=share)) - # Record the attachment - self.field.attached_alloc = alloc def get_inline_mapped_region(self) -> PhysicalRegion: if self.parent is None: