diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py index ae8c8493f4..97ed3e8b85 100644 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -150,8 +150,56 @@ def assign(self, assignment, *_): assignment.assigned[self.rank].append(element) return assignment +class IncreaseGranularity(io.PartialStrategy): -# Example how to implement a simple strategy in Python + def __init__(self, + granularity, + inner_distribution=io.ByHostname(io.RoundRobin())): + super().__init__() + self.inner_distribution = inner_distribution + self.granularity = granularity + + def assign(self, assignment, in_ranks, out_ranks): + if "in_ranks_inner" in dir(self): + return self.inner_distribution.assign(assignment, + self.in_ranks_inner, + self.out_ranks_inner) + + hosts_in_order = [] + already_seen = set() + for (_, hostname) in in_ranks.items(): + if hostname not in already_seen: + already_seen.add(hostname) + hosts_in_order.append(hostname) + del already_seen + hostname_to_hostgroup = {} # real host -> host group + current_meta_host = 0 + granularity_counter = 0 + for host in hosts_in_order: + hostname_to_hostgroup[host] = str(current_meta_host) + granularity_counter += 1 + if granularity_counter >= self.granularity: + granularity_counter = 0 + current_meta_host += 1 + in_ranks_inner = {} + for (rank, hostname) in in_ranks.items(): + in_ranks_inner[rank] = hostname_to_hostgroup[hostname] + out_ranks_inner = {} + for (rank, hostname) in out_ranks.items(): + try: + out_ranks_inner[rank] = hostname_to_hostgroup[hostname] + except KeyError: + out_ranks_inner[rank] = hostname + + self.in_ranks_inner = in_ranks_inner + self.out_ranks_inner = out_ranks_inner + + return self.inner_distribution.assign(assignment, in_ranks_inner, + out_ranks_inner) + + + +#Example how to implement a simple strategy in Python class LoadAll(io.Strategy): def __init__(self, rank): @@ -190,7 +238,7 @@ def distribution_strategy(dataset_extent, strategy_identifier=match.group(2)) return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase) elif strategy_identifier == 'all': - return io.FromPartialStrategy(LoadOne(mpi_rank), LoadAll(mpi_rank)) + return io.FromPartialStrategy(IncreaseGranularity(5), LoadAll(mpi_rank)) elif strategy_identifier == 'roundrobin': return io.RoundRobin() elif strategy_identifier == 'binpacking': @@ -342,6 +390,7 @@ def __copy(self, src, dest, current_path="/data/"): dest.make_constant(src.get_attribute("value")) else: chunk_table = src.available_chunks() + # todo buffer the strategy strategy = distribution_strategy(shape, self.comm.rank, self.comm.size) my_chunks = strategy.assign(chunk_table, self.inranks,