Skip to content

Commit

Permalink
Must defer unordered detaches even in single-node mode
Browse files Browse the repository at this point in the history
Otherwise we end up doing runtime work in a detructor, which causes
CFII to deadlock.
  • Loading branch information
manopapad committed Oct 17, 2023
1 parent 4da1875 commit 046a82b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 50 deletions.
8 changes: 8 additions & 0 deletions legate/core/_legion/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,10 @@ def __init__(self, region: PhysicalRegion, flush: bool = True) -> None:
# it is not deleted before this detach operation can run
self.region = region.region
self.flush = flush
# The following fields aren't part of the operation itself, but are
# used for managing the lifetime of attached objects
self.attached_alloc: Any = None
self.future: Optional[Future] = None

@dispatch
def launch(
Expand Down Expand Up @@ -1401,6 +1405,10 @@ def __init__(
"""
self.external_resources = external_resources
self.flush = flush
# The following fields aren't part of the operation itself, but are
# used for managing the lifetime of attached objects
self.attached_alloc: Any = None
self.future: Optional[Future] = None

def launch(
self,
Expand Down
109 changes: 66 additions & 43 deletions legate/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,12 @@ class FreeFieldInfo:
manager: FieldManager
region: Region
field_id: int
attached_alloc: Union[None, Attachable]
detach: Union[Detach, IndexDetach, None]

def free(self, ordered: bool = False) -> None:
def free(self, ordered: bool) -> None:
self.manager.free_field(
self.region,
self.field_id,
self.attached_alloc,
self.detach,
ordered=ordered,
)
Expand Down Expand Up @@ -258,11 +256,10 @@ def add_free_field(
manager: FieldManager,
region: Region,
field_id: int,
attached_alloc: Union[None, Attachable],
detach: Union[Detach, IndexDetach, None],
) -> None:
self._freed_fields.append(
FreeFieldInfo(manager, region, field_id, attached_alloc, detach)
FreeFieldInfo(manager, region, field_id, detach)
)

def issue_field_match(self, credit: int) -> None:
Expand Down Expand Up @@ -362,14 +359,24 @@ def allocate_field(self, field_size: Any) -> tuple[Region, int, bool]:


def _try_reuse_field(
free_fields: Deque[tuple[Region, int, Union[Future, None]]]
free_fields: Deque[tuple[Region, int, Union[Detach, IndexDetach, None]]]
) -> Optional[tuple[Region, int]]:
if len(free_fields) == 0:
return None
field_info = free_fields.popleft()
if field_info[2] is not None and not field_info[2].is_ready():
field_info[2].wait()
return field_info[0], field_info[1]
region, field_id, detach = free_fields.popleft()

if detach is not None:
if detach.future is None:
# corner case; the detach has been added to _deferred_detachments
# but not dispatched yet, so do that now
runtime.attachment_manager.perform_detachments()
assert detach.future is not None
# We have to wait for the field to be detached from any attachment
# before we can reuse it.
detach.future.wait()
# The Detach operation will asynchronously be removed from
# _pending_detachments through the _prune_detachment mechanism
return region, field_id


# This class manages the allocation and reuse of fields
Expand All @@ -385,7 +392,7 @@ def __init__(
# guaranteed to be ordered across all the shards even with
# control replication
self.free_fields: Deque[
tuple[Region, int, Union[Future, None]]
tuple[Region, int, Union[Detach, IndexDetach, None]]
] = deque()

def destroy(self) -> None:
Expand All @@ -412,17 +419,14 @@ def free_field(
self,
region: Region,
field_id: int,
attached_alloc: Union[None, Attachable],
detach: Union[Detach, IndexDetach, None],
ordered: bool = False,
) -> None:
detach_fut: Union[Future, None] = None
if attached_alloc is not None:
assert detach is not None
detach_fut = runtime.attachment_manager.detach_external_allocation(
attached_alloc, detach
if detach is not None:
runtime.attachment_manager.detach_external_allocation(
detach, ordered
)
self.free_fields.append((region, field_id, detach_fut))
self.free_fields.append((region, field_id, detach))
region_manager = self.runtime.find_region_manager(region)
if region_manager.decrease_active_field_count():
self.runtime.free_region_manager(
Expand Down Expand Up @@ -475,17 +479,14 @@ def free_field(
self,
region: Region,
field_id: int,
attached_alloc: Union[None, Attachable],
detach: Union[Detach, IndexDetach, None],
ordered: bool = False,
) -> None:
if ordered:
super().free_field(
region, field_id, attached_alloc, detach, ordered
)
super().free_field(region, field_id, detach, ordered)
else: # Put this on the unordered list
self._field_match_manager.add_free_field(
self, region, field_id, attached_alloc, detach
self, region, field_id, detach
)


Expand Down Expand Up @@ -523,15 +524,19 @@ def __init__(self, runtime: Runtime) -> None:
self._registered_detachments: dict[
int, Union[Detach, IndexDetach]
] = dict()
self._pending_detachments: dict[Future, Attachable] = dict()
self._deferred_detachments: List[Union[Detach, IndexDetach]] = []
self._pending_detachments: List[Union[Detach, IndexDetach]] = []
self._destroyed = False

def destroy(self) -> None:
self._destroyed = True
# Schedule any queued detachments
self.perform_detachments()
# Always make sure we wait for any pending detachments to be done
# so that we don't lose the references and make the GC unhappy
for future in self._pending_detachments.keys():
future.wait()
for detach in self._pending_detachments:
assert detach.future is not None
detach.future.wait()
self._pending_detachments.clear()
# Clean up our attachments so that they can be collected
self._attachments = dict()
Expand Down Expand Up @@ -614,16 +619,24 @@ def _remove_allocation(self, alloc: Attachable) -> None:

def detach_external_allocation(
self,
attached_alloc: Attachable,
detach: Union[Detach, IndexDetach],
) -> Union[None, Future]:
self._remove_allocation(attached_alloc)
ordered: bool,
previously_deferred: bool = False,
) -> None:
assert detach.attached_alloc is not None
assert detach.future is None
if not previously_deferred:
self._remove_allocation(detach.attached_alloc)
if not ordered:
# If we need to defer this until later do that now
self._deferred_detachments.append(detach)
return
future = self._runtime.dispatch(detach)
# Hang the future on the detach operation itself
detach.future = future
# If the future is already ready, then no need to track it
if future.is_ready():
return None
self._pending_detachments[future] = attached_alloc
return future
if not future.is_ready():
self._pending_detachments.append(detach)

def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int:
key = self._next_detachment_key
Expand All @@ -636,13 +649,24 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]:
del self._registered_detachments[detach_key]
return detach

def perform_detachments(self) -> None:
# We have to clear the list first, otherwise the dispatch() of the
# detach brings us back here, and results in an infinite loop.
detachments = self._deferred_detachments
self._deferred_detachments = list()
for detach in detachments:
if detach.future is None:
self.detach_external_allocation(
detach, ordered=True, previously_deferred=True
)

def prune_detachments(self) -> None:
to_remove = []
for future in self._pending_detachments.keys():
if future.is_ready():
to_remove.append(future)
for future in to_remove:
del self._pending_detachments[future]
new_pending: List[Union[Detach, IndexDetach]] = []
for detach in self._pending_detachments:
assert detach.future is not None
if not detach.future.is_ready():
new_pending.append(detach)
self._pending_detachments = new_pending


class PartitionManager:
Expand Down Expand Up @@ -1339,10 +1363,12 @@ def get_next_storage_id(self) -> int:
return self._next_storage_id

def dispatch(self, op: Dispatchable[T]) -> T:
self._attachment_manager.perform_detachments()
self._attachment_manager.prune_detachments()
return op.launch(self.legion_runtime, self.legion_context)

def dispatch_single(self, op: Dispatchable[T]) -> T:
self._attachment_manager.perform_detachments()
self._attachment_manager.prune_detachments()
return op.launch(self.legion_runtime, self.legion_context)

Expand Down Expand Up @@ -1828,7 +1854,6 @@ def free_field(
field_id: int,
field_size: int,
shape: Shape,
attached_alloc: Union[None, Attachable],
detach: Union[Detach, IndexDetach, None],
) -> None:
# Have a guard here to make sure that we don't try to
Expand All @@ -1840,9 +1865,7 @@ def free_field(
if key not in self.field_managers:
return

self.field_managers[key].free_field(
region, field_id, attached_alloc, detach
)
self.field_managers[key].free_field(region, field_id, detach)

def import_output_region(
self, out_region: OutputRegion, field_id: int, dtype: Any
Expand Down
12 changes: 5 additions & 7 deletions legate/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ def __init__(
self.field_id = field_id
self.field_size = field_size
self.shape = shape
# External allocation we attached to this field
self.attached_alloc: Union[None, Attachable] = None
self.detach_key: int = -1

def same_handle(self, other: Field) -> bool:
Expand All @@ -96,7 +94,7 @@ def __del__(self) -> None:
# Detach any existing allocation
detach = (
None
if self.attached_alloc is None
if self.detach_key < 0
else attachment_manager.remove_detachment(self.detach_key)
)
# Return our field back to the runtime
Expand All @@ -105,7 +103,6 @@ def __del__(self) -> None:
self.field_id,
self.field_size,
self.shape,
self.attached_alloc,
detach,
)

Expand Down Expand Up @@ -158,7 +155,7 @@ def attach_external_allocation(
) -> None:
assert self.parent is None
# If we already have some memory attached, detach it first
if self.field.attached_alloc is not None:
if self.field.detach_key >= 0:
raise RuntimeError("A RegionField cannot be re-attached")
# All inline mappings should have been unmapped by now
assert self.physical_region_refs == 0
Expand All @@ -167,6 +164,9 @@ def attach_external_allocation(
attachment_manager.attach_external_allocation(alloc, self)

def record_detach(detach: Union[Detach, IndexDetach]) -> None:
# Hang the allocation on the detach operation, so it won't be
# deallocated until the detach is processed.
detach.attached_alloc = alloc
# Don't store the detachment operation here, instead register it
# on the attachment manager and record its unique key
# TODO: This might not be necessary anymore
Expand Down Expand Up @@ -240,8 +240,6 @@ def record_detach(detach: Union[Detach, IndexDetach]) -> None:
# We don't need to flush the contents back to the attached memory
# if this is an internal temporary allocation.
record_detach(IndexDetach(external_resources, flush=share))
# Record the attachment
self.field.attached_alloc = alloc

def get_inline_mapped_region(self) -> PhysicalRegion:
if self.parent is None:
Expand Down

0 comments on commit 046a82b

Please sign in to comment.