"""Helpers for distributing work across CPUs or nodes.
These utilities provide simple index-based partitioning that works with or
without MPI.
"""
# from mpi4py import MPI
import numpy as np
[docs]
def get_unique_id2(rank, run_number, cpus_per_node):
"""Return a unique integer ID for a worker.
Parameters
----------
rank : int
Rank or index for the worker on a node.
run_number : int
Index of the current run or batch.
cpus_per_node : int
Number of CPUs available per node.
Returns
-------
int
Unique ID derived from rank, run number, and CPUs per node.
"""
unique_id = int(np.arange(cpus_per_node * run_number, cpus_per_node * (run_number + 1))[rank])
return unique_id
[docs]
def get_unique_id(rank, run_number, cpus_per_node):
"""
Calculates a unique ID based on the rank, run number, and CPUs per node.
Parameters:
rank (int): The rank of the CPU within the node.
run_number (int): The current run number.
cpus_per_node (int): The number of CPUs per node.
Returns:
int: The unique ID for the given rank, run number, and CPUs per node.
"""
unique_id = rank + cpus_per_node * run_number
return unique_id
[docs]
def distribute_array_no_mpi(unique_id, total_jobs, array_size):
"""Compute a contiguous slice of work for a given worker.
Parameters
----------
unique_id : int
Unique worker identifier (e.g., from ``get_unique_id``).
total_jobs : int
Total number of workers sharing the work.
array_size : int
Length of the 1D array being partitioned.
Returns
-------
tuple[int | None, int | None]
``(start_idx, end_idx)`` for the slice assigned to the worker,
or ``(None, None)`` if no work is assigned.
"""
# Calculate the base chunk size and remainder
chunk_size = array_size // total_jobs
remainder = array_size % total_jobs
# Distribute the remainder: the first 'remainder' jobs get an extra element
if unique_id < remainder:
start_idx = unique_id * (chunk_size + 1)
end_idx = start_idx + chunk_size + 1
else:
start_idx = unique_id * chunk_size + remainder
end_idx = start_idx + chunk_size
# Handle cases where start_idx is out of bounds
if start_idx >= array_size:
return None, None # No portion assigned for this unique_id
return int(start_idx), int(end_idx)
# def distribute_array(array):
# """
# Distributes a 1D array among MPI processes.
# Parameters:
# array: numpy.ndarray
# The 1D array to be distributed.
# comm: MPI communicator
# The MPI communicator.
# Returns:
# local_data: numpy.ndarray
# The portion of array assigned to the current MPI process.
# """
# comm = MPI.COMM_WORLD
# rank = comm.Get_rank()
# num_procs = comm.Get_size()
# remainder = len(array) % num_procs
# base_load = len(array) // num_procs
# if rank == 0:
# print('All processors will process at least {0} simulations.'.format(
# base_load))
# print('{0} processors will process an additional simulations'.format(
# remainder))
# load_list = np.concatenate((np.ones(remainder) * (base_load + 1),
# np.ones(num_procs - remainder) * base_load))
# if rank == 0:
# print('load_list={0}'.format(load_list))
# if rank < remainder:
# local_array = np.zeros(base_load + 1, dtype=np.int64)
# else:
# local_array = np.zeros(base_load, dtype=np.int64)
# disp = np.zeros(num_procs)
# for i in range(len(load_list)):
# if i == 0:
# disp[i] = 0
# else:
# disp[i] = disp[i - 1] + load_list[i - 1]
# comm.Scatterv([array, load_list, disp, MPI.DOUBLE], local_array)
# print(f"Process {rank} received the indices {local_array}")
# return local_array
# Example usage:
if __name__ == "__main__":
array = np.arange(1000) # or any list or numpy array
for run_number in np.arange(10):
for rank in np.arange(12):
print("comparing unique id methods:", get_unique_id(rank, run_number, 12), get_unique_id2(rank, run_number, 12))
# print(f"\nTesting distribute_array:\n")
# local_data = distribute_array(array)
# print(local_data)
print(f"\nTesting distribute_array_no_mpi:\n")
for unique_id in np.arange(10):
start_idx, end_idx = distribute_array_no_mpi(unique_id, total_jobs=10, array_size=array.size)
print(start_idx, end_idx)
print(array[start_idx:end_idx])
print()
# def mpi_scatter(scatter_array):
# comm = MPI.COMM_WORLD # Defines the default communicator
# num_procs = comm.Get_size() # Stores the number of processes in size.
# rank = comm.Get_rank() # Stores the rank (pid) of the current process
# # stat = MPI.Status()
# print(f'Number of procs: {num_procs}, rank: {rank}')
# remainder = np.size(scatter_array) % num_procs
# base_load = np.size(scatter_array) // num_procs
# if rank == 0:
# print('All processors will process at least {0} simulations.'.format(
# base_load))
# print('{0} processors will process an additional simulations'.format(
# remainder))
# load_list = np.concatenate((np.ones(remainder) * (base_load + 1),
# np.ones(num_procs - remainder) * base_load))
# if rank == 0:
# print('load_list={0}'.format(load_list))
# if rank < remainder:
# scatter_array_local = np.zeros(base_load + 1, dtype=np.int64)
# else:
# scatter_array_local = np.zeros(base_load, dtype=np.int64)
# disp = np.zeros(num_procs)
# for i in range(np.size(load_list)):
# if i == 0:
# disp[i] = 0
# else:
# disp[i] = disp[i - 1] + load_list[i - 1]
# comm.Scatterv([scatter_array, load_list, disp, MPI.DOUBLE], scatter_array_local)
# print(f"Process {rank} received the scattered arrays: {scatter_array_local}")
# return scatter_array_local, rank
# def mpi_scatter_exclude_rank_0(scatter_array):
# # Function is for rank 0 to be used as a saving processor - all other processors will complete tasks.
# comm = MPI.COMM_WORLD
# num_procs = comm.Get_size()
# rank = comm.Get_rank()
# print(f'Number of procs: {num_procs}, rank: {rank}')
# num_workers = num_procs - 1
# remainder = np.size(scatter_array) % num_workers
# base_load = np.size(scatter_array) // num_workers
# if rank == 0:
# print(f'All processors will process at least {base_load} simulations.')
# print(f'{remainder} processors will process an additional simulation.')
# load_list = np.concatenate((np.zeros(1), np.ones(remainder) * (base_load + 1),
# np.ones(num_workers - remainder) * base_load))
# if rank == 0:
# print(f'load_list={load_list}')
# scatter_array_local = np.zeros(int(load_list[rank]), dtype=np.int64)
# disp = np.zeros(num_procs)
# for i in range(1, num_procs):
# disp[i] = disp[i - 1] + load_list[i - 1]
# if rank == 0:
# dummy_recvbuf = np.zeros(1, dtype=np.int64)
# comm.Scatterv([scatter_array, load_list, disp, MPI.INT64_T], dummy_recvbuf)
# else:
# comm.Scatterv([scatter_array, load_list, disp, MPI.INT64_T], scatter_array_local)
# print(f"Process {rank} received the {len(scatter_array_local)} element scattered array: {scatter_array_local}")
# return scatter_array_local, rank