Skip to content

Commit

Permalink
[Cylon] Fix Cylon CI system and add slice operation on scaling test
Browse files Browse the repository at this point in the history
Signed-off-by: Arup Sarker <[email protected]>
  • Loading branch information
arupcsedu committed Jul 25, 2023
1 parent b0b15f0 commit 283de82
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
4 changes: 2 additions & 2 deletions conda/environments/cylon_MacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dependencies:
- glog
- openmpi>=4.1.2
- cython>=0.29,<0.30
- numpy
- pandas>=1.0,<1.6.0dev0
- numpy<1.24.4
- pandas>=1.0,<2.0.0
- fsspec>=0.6.0
- setuptools
# they are not needed for using pygcylon or compiling it
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/gcylon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dependencies:
- glog
- openmpi=4.1.3=ha1ae619_105
- ucx>=1.12.1
- numpy
- pandas>=1.0,<1.6.0dev0
- numpy<1.24.4
- pandas>=1.0,<2.0.0
- fsspec>=0.6.0
- setuptools
# these are for running tests only,
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dependencies:
- glog
- msmpi
- cython>=0.29,<0.30
- numpy
- pandas>=1.0,<1.6.0dev0
- numpy<1.24.4
- pandas>=1.0,<2.0.0
- fsspec>=0.6.0
- setuptools
# they are not needed for using pygcylon or compiling it
Expand Down
51 changes: 51 additions & 0 deletions rivanna/scripts/cylon_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,57 @@ def join(data=None):

env.finalize()

def slice(data=None):
StopWatch.start(f"slice_total_{data['host']}_{data['rows']}_{data['it']}")

comm = MPI.COMM_WORLD

config = MPIConfig(comm)
env = CylonEnv(config=config, distributed=True)

u = data['unique']

if data['scaling'] == 'w': # weak
num_rows = data['rows']
max_val = num_rows * env.world_size
else: # 's' strong
max_val = data['rows']
num_rows = int(data['rows'] / env.world_size)

rng = default_rng(seed=env.rank)
data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2))
data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2))

df1 = DataFrame(pd.DataFrame(data1).add_prefix("col"))
df2 = DataFrame(pd.DataFrame(data2).add_prefix("col"))

if env.rank == 0:
print("Task# ", data['task'])

for i in range(data['it']):
env.barrier()
StopWatch.start(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")
t1 = time.time()
df3 = df1[0:20000000, env] # distributed slice
#print(df3)
#df3 = df1.merge(df2, on=[0], algorithm='sort', env=env)
env.barrier()
t2 = time.time()
t = (t2 - t1)
sum_t = comm.reduce(t)
tot_l = comm.reduce(len(df3))

if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
StopWatch.stop(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")

StopWatch.stop(f"slice_total_{data['host']}_{data['rows']}_{data['it']}")

if env.rank == 0:
StopWatch.benchmark(tag=str(data))

env.finalize()

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="weak scaling")
Expand Down

0 comments on commit 283de82

Please sign in to comment.