Skip to content

Commit

Permalink
Extend IncreaseGranularity strategy
Browse files Browse the repository at this point in the history
Now supports different granularities at write and read sides
  • Loading branch information
franzpoeschel committed Dec 11, 2024
1 parent 96ee574 commit 2f4a3e6
Showing 1 changed file with 98 additions and 40 deletions.
138 changes: 98 additions & 40 deletions src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,53 +150,104 @@ def assign(self, assignment, *_):
return assignment

class IncreaseGranularity(io.PartialStrategy):

def __init__(self,
granularity,
inner_distribution=io.ByHostname(io.RoundRobin())):
def __init__(
self,
granularity_in,
granularity_out,
inner_distribution=io.ByHostname(io.RoundRobin()),
):
super().__init__()
self.inner_distribution = inner_distribution
self.granularity = granularity
self.granularity_in = granularity_in
self.granularity_out = granularity_out

def assign(self, assignment, in_ranks, out_ranks):
if "in_ranks_inner" in dir(self):
return self.inner_distribution.assign(assignment,
self.in_ranks_inner,
self.out_ranks_inner)

hosts_in_order = []
already_seen = set()
for (_, hostname) in in_ranks.items():
if hostname not in already_seen:
already_seen.add(hostname)
hosts_in_order.append(hostname)
del already_seen
hostname_to_hostgroup = {} # real host -> host group
current_meta_host = 0
granularity_counter = 0
for host in hosts_in_order:
hostname_to_hostgroup[host] = str(current_meta_host)
granularity_counter += 1
if granularity_counter >= self.granularity:
granularity_counter = 0
current_meta_host += 1
in_ranks_inner = {}
for (rank, hostname) in in_ranks.items():
in_ranks_inner[rank] = hostname_to_hostgroup[hostname]
out_ranks_inner = {}
for (rank, hostname) in out_ranks.items():
try:
out_ranks_inner[rank] = hostname_to_hostgroup[hostname]
except KeyError:
out_ranks_inner[rank] = hostname

self.in_ranks_inner = in_ranks_inner
self.out_ranks_inner = out_ranks_inner

return self.inner_distribution.assign(assignment, in_ranks_inner,
out_ranks_inner)
return self.inner_distribution.assign(
assignment, self.in_ranks_inner, self.out_ranks_inner
)

def hosts_in_order(rank_assignment):
already_seen = set()
res = []
for (_, hostname) in rank_assignment.items():
if hostname not in already_seen:
already_seen.add(hostname)
res.append(hostname)
return res

in_hosts_in_order = hosts_in_order(in_ranks)
out_hosts_in_order = hosts_in_order(out_ranks)

def hostname_to_hostgroup(ordered_hosts, granularity):
res = {} # real host -> host group
current_meta_host = 0
granularity_counter = 0
for host in ordered_hosts:
res[host] = str(current_meta_host)
granularity_counter += 1
if granularity_counter >= granularity:
granularity_counter = 0
current_meta_host += 1
return res

in_hostname_to_hostgroup = hostname_to_hostgroup(
in_hosts_in_order, self.granularity_in
)
out_hostname_to_hostgroup = hostname_to_hostgroup(
out_hosts_in_order, self.granularity_out
)

def inner_rank_assignment(outer_assignment, hostname_to_hostgroup):
res = {}
for (rank, hostname) in outer_assignment.items():
res[rank] = hostname_to_hostgroup[hostname]
return res

self.in_ranks_inner = inner_rank_assignment(in_ranks, in_hostname_to_hostgroup)
self.out_ranks_inner = inner_rank_assignment(
out_ranks, out_hostname_to_hostgroup
)

return self.inner_distribution.assign(
assignment, self.in_ranks_inner, self.out_ranks_inner
)

class MergingStrategy(io.Strategy):
def __init__(self, inner_strategy):
super().__init__()
self.inner_strategy = inner_strategy

def assign(self, assignment, in_ranks, out_ranks):
res = self.inner_strategy.assign(assignment, in_ranks, out_ranks)
for out_rank, assignment in res.items():
merged = assignment.merge_chunks_from_same_sourceID()
assignment.clear()
for in_rank, chunks in merged.items():
for chunk in chunks:
assignment.append(
io.WrittenChunkInfo(chunk.offset, chunk.extent, in_rank)
)
return res


# strategy = IncreaseGranularity(2, 1)
# assignment = [
# io.WrittenChunkInfo([0], [1], 0),
# io.WrittenChunkInfo([1], [1], 1),
# io.WrittenChunkInfo([2], [1], 2),
# io.WrittenChunkInfo([3], [1], 3),
# ]
# in_ranks = {0: "host0", 1: "host1", 2: "host3", 3: "host4"}
# out_ranks = {0: "host2", 1: "host5"}
# res = strategy.assign(assignment, in_ranks, out_ranks)
# print(f"NOT ASSIGNED: {len(res.not_assigned)} chunks")
# print("ASSIGNED:")
# for rank, chunks in res.assigned.items():
# print(f"\tRANK {rank}:", end='')
# for chunk in chunks:
# print(f" [{chunk.offset}-{chunk.extent}]", end='')
# print()

#Example how to implement a simple strategy in Python
class LoadAll(io.Strategy):
Expand Down Expand Up @@ -236,6 +287,13 @@ def distribution_strategy(dataset_extent,
mpi_size,
strategy_identifier=match.group(2))
return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase)
elif strategy_identifier == 'fan_in':
granularity = os.environ['OPENPMD_FAN_IN']
granularity = int(granularity)
return IncreaseGranularity(
granularity, 1,
io.FromPartialStrategy(io.ByHostname(io.RoundRobin()),
io.FailingStrategy()))
elif strategy_identifier == 'all':
return io.FromPartialStrategy(IncreaseGranularity(5), LoadAll(mpi_rank))
elif strategy_identifier == 'roundrobin':
Expand Down

0 comments on commit 2f4a3e6

Please sign in to comment.