From 2f4a3e6c07229b58aacef158005d04a14609ede4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 5 Aug 2024 09:17:19 -0400 Subject: [PATCH] Extend IncreaseGranularity strategy Now supports different granularities at write and read sides --- .../python/openpmd_api/pipe/__main__.py | 138 +++++++++++++----- 1 file changed, 98 insertions(+), 40 deletions(-) diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py index e86bdafbf3..15978c8a4b 100644 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -150,53 +150,104 @@ def assign(self, assignment, *_): return assignment class IncreaseGranularity(io.PartialStrategy): - - def __init__(self, - granularity, - inner_distribution=io.ByHostname(io.RoundRobin())): + def __init__( + self, + granularity_in, + granularity_out, + inner_distribution=io.ByHostname(io.RoundRobin()), + ): super().__init__() self.inner_distribution = inner_distribution - self.granularity = granularity + self.granularity_in = granularity_in + self.granularity_out = granularity_out def assign(self, assignment, in_ranks, out_ranks): if "in_ranks_inner" in dir(self): - return self.inner_distribution.assign(assignment, - self.in_ranks_inner, - self.out_ranks_inner) - - hosts_in_order = [] - already_seen = set() - for (_, hostname) in in_ranks.items(): - if hostname not in already_seen: - already_seen.add(hostname) - hosts_in_order.append(hostname) - del already_seen - hostname_to_hostgroup = {} # real host -> host group - current_meta_host = 0 - granularity_counter = 0 - for host in hosts_in_order: - hostname_to_hostgroup[host] = str(current_meta_host) - granularity_counter += 1 - if granularity_counter >= self.granularity: - granularity_counter = 0 - current_meta_host += 1 - in_ranks_inner = {} - for (rank, hostname) in in_ranks.items(): - in_ranks_inner[rank] = hostname_to_hostgroup[hostname] - out_ranks_inner = {} - for (rank, hostname) in out_ranks.items(): - try: - out_ranks_inner[rank] = hostname_to_hostgroup[hostname] - except KeyError: - out_ranks_inner[rank] = hostname - - self.in_ranks_inner = in_ranks_inner - self.out_ranks_inner = out_ranks_inner - - return self.inner_distribution.assign(assignment, in_ranks_inner, - out_ranks_inner) + return self.inner_distribution.assign( + assignment, self.in_ranks_inner, self.out_ranks_inner + ) + + def hosts_in_order(rank_assignment): + already_seen = set() + res = [] + for (_, hostname) in rank_assignment.items(): + if hostname not in already_seen: + already_seen.add(hostname) + res.append(hostname) + return res + + in_hosts_in_order = hosts_in_order(in_ranks) + out_hosts_in_order = hosts_in_order(out_ranks) + + def hostname_to_hostgroup(ordered_hosts, granularity): + res = {} # real host -> host group + current_meta_host = 0 + granularity_counter = 0 + for host in ordered_hosts: + res[host] = str(current_meta_host) + granularity_counter += 1 + if granularity_counter >= granularity: + granularity_counter = 0 + current_meta_host += 1 + return res + + in_hostname_to_hostgroup = hostname_to_hostgroup( + in_hosts_in_order, self.granularity_in + ) + out_hostname_to_hostgroup = hostname_to_hostgroup( + out_hosts_in_order, self.granularity_out + ) + + def inner_rank_assignment(outer_assignment, hostname_to_hostgroup): + res = {} + for (rank, hostname) in outer_assignment.items(): + res[rank] = hostname_to_hostgroup[hostname] + return res + + self.in_ranks_inner = inner_rank_assignment(in_ranks, in_hostname_to_hostgroup) + self.out_ranks_inner = inner_rank_assignment( + out_ranks, out_hostname_to_hostgroup + ) + + return self.inner_distribution.assign( + assignment, self.in_ranks_inner, self.out_ranks_inner + ) + +class MergingStrategy(io.Strategy): + def __init__(self, inner_strategy): + super().__init__() + self.inner_strategy = inner_strategy + + def assign(self, assignment, in_ranks, out_ranks): + res = self.inner_strategy.assign(assignment, in_ranks, out_ranks) + for out_rank, assignment in res.items(): + merged = assignment.merge_chunks_from_same_sourceID() + assignment.clear() + for in_rank, chunks in merged.items(): + for chunk in chunks: + assignment.append( + io.WrittenChunkInfo(chunk.offset, chunk.extent, in_rank) + ) + return res +# strategy = IncreaseGranularity(2, 1) +# assignment = [ +# io.WrittenChunkInfo([0], [1], 0), +# io.WrittenChunkInfo([1], [1], 1), +# io.WrittenChunkInfo([2], [1], 2), +# io.WrittenChunkInfo([3], [1], 3), +# ] +# in_ranks = {0: "host0", 1: "host1", 2: "host3", 3: "host4"} +# out_ranks = {0: "host2", 1: "host5"} +# res = strategy.assign(assignment, in_ranks, out_ranks) +# print(f"NOT ASSIGNED: {len(res.not_assigned)} chunks") +# print("ASSIGNED:") +# for rank, chunks in res.assigned.items(): +# print(f"\tRANK {rank}:", end='') +# for chunk in chunks: +# print(f" [{chunk.offset}-{chunk.extent}]", end='') +# print() #Example how to implement a simple strategy in Python class LoadAll(io.Strategy): @@ -236,6 +287,13 @@ def distribution_strategy(dataset_extent, mpi_size, strategy_identifier=match.group(2)) return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase) + elif strategy_identifier == 'fan_in': + granularity = os.environ['OPENPMD_FAN_IN'] + granularity = int(granularity) + return IncreaseGranularity( + granularity, 1, + io.FromPartialStrategy(io.ByHostname(io.RoundRobin()), + io.FailingStrategy())) elif strategy_identifier == 'all': return io.FromPartialStrategy(IncreaseGranularity(5), LoadAll(mpi_rank)) elif strategy_identifier == 'roundrobin':