Skip to content

Commit

Permalink
Handles multiple partitions for cancelled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Llannelongue committed May 31, 2022
1 parent 1647c91 commit f8d5c63
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
5 changes: 2 additions & 3 deletions GreenAlgorithms_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def check_empty_results(self, df, filterWD=None, filterJobIDs='all', filterAccou
''')
sys.exit()

# TODO: check values in cluster_info.yaml


class Helpers_GA():

Expand Down Expand Up @@ -406,7 +404,7 @@ def main(args, cluster_info, fParams):

# For debuging, load custom cluster info
if args.useOtherClusterInfo != '':
print(f"Overrriding cluster_info with: {args.useOtherClusterInfo}")
print(f"Overriding cluster_info with: {args.useOtherClusterInfo}")
with open(os.path.join('clustersData', args.useOtherClusterInfo), "r") as stream:
try:
cluster_info = yaml.safe_load(stream)
Expand All @@ -416,6 +414,7 @@ def main(args, cluster_info, fParams):
### Set the WD to filter on, if needed
if args.filterCWD:
args.filterWD = args.userCWD
print("\nNB: --filterCWD doesn't work with symbolic links (yet!)\n")
else:
args.filterWD = None

Expand Down
20 changes: 10 additions & 10 deletions GreenAlgorithms_workloadManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,15 @@ def clean_partition(self, x):
:param cluster_info: [dict]
:return: [str] one partition or empty string
'''
if pd.isnull(x):
if pd.isnull(x.Partition):
return ''
else:
L_partitions = x.split(',')
L_TDP = [self.cluster_info['partitions'][p]['TDP'] for p in L_partitions]
# FIXME commenting out while fixing it
if len(set(L_TDP)) > 1:
print(f'Different cores for the different partitions specified for a same job: {x}')
# assert len(set(L_TDP)) == 1, f'Different cores for the different partitions specified for a same job: {x}'
# assert all([p in self.cluster_info['partitions'] for p in L_partitions]), f"Unrecognised partition: {x}"
L_partitions = x.Partition.split(',')
if x.WallclockTimeX.total_seconds() > 0:
# Multiple partitions logged is only an issue for jobs that never started,
# for the others, only the used partition is logged
if len(L_partitions) > 1:
print(f"\n-!- WARNING: Multiple partitions logged on a job than ran: {x.JobID} - {x.Partition} (using the first one)\n")
return L_partitions[0]

def set_partitionType(self, x):
Expand Down Expand Up @@ -303,7 +302,7 @@ def clean_logs_df(self):

### Clean partition
# Make sure it's either a partition name, or a comma-separated list of partitions
self.logs_df['PartitionX'] = self.logs_df.Partition.apply(self.clean_partition)
self.logs_df['PartitionX'] = self.logs_df.apply(self.clean_partition, axis=1)

### Parse submit datetime
self.logs_df['SubmitDatetimeX'] = self.logs_df.Submit.apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%S"))
Expand Down Expand Up @@ -392,8 +391,9 @@ def clean_logs_df(self):
### Filter on working directory
if self.args.filterWD is not None:
foo = len(self.df_agg)
# TODO: Doesn't not work with symbolic links
self.df_agg = self.df_agg.loc[self.df_agg.WorkingDir_ == self.args.filterWD]
print(f'Filtered out {foo-len(self.df_agg):,} rows (filterCWD={self.args.filterWD})')
# print(f'Filtered out {foo-len(self.df_agg):,} rows (filterCWD={self.args.filterWD})') # DEBUGONLY

### Filter on Job ID
self.df_agg.reset_index(inplace=True)
Expand Down

0 comments on commit f8d5c63

Please sign in to comment.