-
Notifications
You must be signed in to change notification settings - Fork 15
/
run-events
executable file
·676 lines (555 loc) · 22.1 KB
/
run-events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
#!/usr/bin/env python3
import argparse
from contextlib import contextmanager
import datetime
from itertools import chain, groupby, repeat
import logging
import math
import os
import pickle
import signal
import subprocess
import sys
import tempfile
import numpy as np
import h5py
import freestream
import frzout
def run_cmd(*args):
"""
Run and log a subprocess.
"""
cmd = ' '.join(args)
logging.info('running command: %s', cmd)
try:
proc = subprocess.run(
cmd.split(), check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True
)
except subprocess.CalledProcessError as e:
logging.error(
'command failed with status %d:\n%s',
e.returncode, e.output.strip('\n')
)
raise
else:
logging.debug(
'command completed successfully:\n%s',
proc.stdout.strip('\n')
)
return proc
class Parser(argparse.ArgumentParser):
"""
ArgumentParser that parses files with 'key = value' lines.
"""
def __init__(self, *args, fromfile_prefix_chars='@', **kwargs):
super().__init__(
*args, fromfile_prefix_chars=fromfile_prefix_chars, **kwargs
)
def convert_arg_line_to_args(self, arg_line):
# split each line on = and prepend prefix chars to first arg so it is
# parsed as a long option
args = [i.strip() for i in arg_line.split('=', maxsplit=1)]
args[0] = 2*self.prefix_chars[0] + args[0]
return args
parser = Parser(
usage=''.join('\n %(prog)s ' + i for i in [
'[options] <results_file>',
'checkpoint <checkpoint_file>',
'-h | --help',
]),
description='''
Run relativistic heavy-ion collision events.
In the first form, run events according to the given options (below) and write
results to binary file <results_file>.
In the second form, run the event saved in <checkpoint_file>, previously
created by using the --checkpoint option and interrupting an event in progress.
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
def parse_args_checkpoint():
"""
Parse command line arguments according to the parser usage info. Return a
tuple (args, ic) where `args` is a normal argparse.Namespace and `ic` is
either None or an np.array of the checkpointed initial condition.
First, check for the special "checkpoint" form, and if found, load and
return the args and checkpoint initial condition from the specified file.
If not, let the parser object handle everything.
This is a little hacky but it works fine. Truth is, argparse can't do
exactly what I want here. I suppose `docopt` might be a better option, but
it's not worth the effort to rewrite everything.
"""
def usage():
parser.print_usage(sys.stderr)
sys.exit(2)
if len(sys.argv) == 1:
usage()
if sys.argv[1] == 'checkpoint':
if len(sys.argv) != 3:
usage()
path = sys.argv[2]
try:
with open(path, 'rb') as f:
args, ic = pickle.load(f)
except Exception as e:
msg = '{}: {}'.format(type(e).__name__, e)
if path not in msg:
msg += ": '{}'".format(path)
sys.exit(msg)
# as a simple integrity check, require that the checkpoint file is
# actually the file specified in the checkpointed args
if os.path.abspath(path) != args.checkpoint:
sys.exit(
"checkpoint file path '{}' does not match saved path '{}'"
.format(path, args.checkpoint)
)
return args, ic
return parser.parse_args(), None
parser.add_argument(
'results', type=os.path.abspath,
help=argparse.SUPPRESS
)
parser.add_argument(
'--buffering', type=int, default=0, metavar='INT',
help='results file buffer size in bytes (default: no buffering)'
)
parser.add_argument(
'--nevents', type=int, metavar='INT',
help='number of events to run (default: run until interrupted)'
)
parser.add_argument(
'--rankvar', metavar='VAR',
help='environment variable containing process rank'
)
parser.add_argument(
'--rankfmt', metavar='FMT',
help='format string for rank integer'
)
parser.add_argument(
'--tmpdir', type=os.path.abspath, metavar='PATH',
help='temporary directory (default: {})'.format(tempfile.gettempdir())
)
parser.add_argument(
'--checkpoint', type=os.path.abspath, metavar='PATH',
help='checkpoint file [pickle format]'
)
parser.add_argument(
'--particles', type=os.path.abspath, metavar='PATH',
help='raw particle data file (default: do not save)'
)
parser.add_argument(
'--logfile', type=os.path.abspath, metavar='PATH',
help='log file (default: stdout)'
)
parser.add_argument(
'--loglevel', choices={'debug', 'info', 'warning', 'error', 'critical'},
default='info',
help='log level (default: %(default)s)'
)
parser.add_argument(
'--nucleon-width', type=float, default=.5, metavar='FLOAT',
help='trento nucleon width [fm] (default: %(default)s fm)'
)
parser.add_argument(
'--trento-args', default='Pb Pb', metavar='ARGS',
help="arguments passed to trento (default: '%(default)s')"
)
parser.add_argument(
'--tau-fs', type=float, default=.5, metavar='FLOAT',
help='free streaming time [fm] (default: %(default)s fm)'
)
parser.add_argument(
'--hydro-args', default='', metavar='ARGS',
help='arguments passed to osu-hydro (default: empty)'
)
parser.add_argument(
'--Tswitch', type=float, default=.150, metavar='FLOAT',
help='particlization temperature [GeV] (default: %(default).3f GeV)'
)
class StopEvent(Exception):
""" Raise to end an event early. """
def run_events(args, results_file, particles_file=None, checkpoint_ic=None):
"""
Run events as determined by user input:
- Read options from `args`, as returned by `parser.parse_args()`.
- Write results to binary file object `results_file`.
- If `checkpoint_ic` is given, run only that IC.
Return True if at least one event completed successfully, otherwise False.
"""
# set the grid step size proportionally to the nucleon width
grid_step = .15*args.nucleon_width
# the "target" grid max: the grid shall be at least as large as the target
grid_max_target = 15
# next two lines set the number of grid cells and actual grid max,
# which will be >= the target (same algorithm as trento)
grid_n = math.ceil(2*grid_max_target/grid_step)
grid_max = .5*grid_n*grid_step
logging.info(
'grid step = %.6f fm, n = %d, max = %.6f fm',
grid_step, grid_n, grid_max
)
def _initial_conditions(nevents=10, initial_file='initial.hdf'):
"""
Run trento and yield initial condition arrays.
"""
try:
os.remove(initial_file)
except FileNotFoundError:
pass
run_cmd(
'trento',
'--number-events {}'.format(nevents),
'--grid-step {} --grid-max {}'.format(grid_step, grid_max_target),
'--output', initial_file,
'--nucleon-width {}'.format(args.nucleon_width),
args.trento_args
)
with h5py.File(initial_file, 'r') as f:
for dset in f.values():
ic = np.array(dset)
# Write the checkpoint file _before_ starting the event so that
# even if the process is forcefully killed, the state will be
# saved. If / when all events complete, delete the file.
if args.checkpoint is not None:
with open(args.checkpoint, 'wb') as cf:
pickle.dump((args, ic), cf, pickle.HIGHEST_PROTOCOL)
logging.info('wrote checkpoint file %s', args.checkpoint)
yield ic
if checkpoint_ic is None:
# if nevents was specified, generate that number of initial conditions
# otherwise generate indefinitely
initial_conditions = (
chain.from_iterable(_initial_conditions() for _ in repeat(None))
if args.nevents is None else
_initial_conditions(args.nevents)
)
else:
# just run the checkpointed IC
initial_conditions = [checkpoint_ic]
# create sampler HRG object (to be reused for all events)
hrg_kwargs = dict(species='urqmd', res_width=True)
hrg = frzout.HRG(args.Tswitch, **hrg_kwargs)
# append switching energy density to hydro arguments
eswitch = hrg.energy_density()
hydro_args = [args.hydro_args, 'edec={}'.format(eswitch)]
# arguments for "coarse" hydro pre-runs
# no viscosity, run down to low temperature 110 MeV
hydro_args_coarse = [
'etas_hrg=0 etas_min=0 etas_slope=0 zetas_max=0 zetas_width=0',
'edec={}'.format(frzout.HRG(.110, **hrg_kwargs).energy_density())
]
def run_hydro(fs, event_size, coarse=False, dt_ratio=.25):
"""
Run the initial condition contained in FreeStreamer object `fs` through
osu-hydro on a grid with approximate physical size `event_size` [fm].
Return a dict of freeze-out surface data suitable for passing directly
to frzout.Surface.
Initial condition arrays are cropped or padded as necessary.
If `coarse` is an integer > 1, use only every `coarse`th cell from the
initial condition arrays (thus increasing the physical grid step size
by a factor of `coarse`). Ignore the user input `hydro_args` and
instead run ideal hydro down to a low temperature.
`dt_ratio` sets the timestep as a fraction of the spatial step
(dt = dt_ratio * dxy). The SHASTA algorithm requires dt_ratio < 1/2.
"""
dxy = grid_step * (coarse or 1)
ls = math.ceil(event_size/dxy) # the osu-hydro "ls" parameter
n = 2*ls + 1 # actual number of grid cells
for fmt, f, arglist in [
('ed', fs.energy_density, [()]),
('u{}', fs.flow_velocity, [(1,), (2,)]),
('pi{}{}', fs.shear_tensor, [(1, 1), (1, 2), (2, 2)]),
]:
for a in arglist:
X = f(*a)
if coarse:
X = X[::coarse, ::coarse]
diff = X.shape[0] - n
start = int(abs(diff)/2)
if diff > 0:
# original grid is larger -> cut out middle square
s = slice(start, start + n)
X = X[s, s]
elif diff < 0:
# original grid is smaller
# -> create new array and place original grid in middle
Xn = np.zeros((n, n))
s = slice(start, start + X.shape[0])
Xn[s, s] = X
X = Xn
X.tofile(fmt.format(*a) + '.dat')
dt = dxy*dt_ratio
run_cmd(
'osu-hydro',
't0={} dt={} dxy={} nls={}'.format(args.tau_fs, dt, dxy, ls),
*(hydro_args_coarse if coarse else hydro_args)
)
surface = np.fromfile('surface.dat', dtype='f8').reshape(-1, 16)
# end event if the surface is empty -- this occurs in ultra-peripheral
# events where the initial condition doesn't exceed Tswitch
if surface.size == 0:
raise StopEvent('empty surface')
# surface columns:
# 0 1 2 3 4 5 6 7
# tau x y dsigma_t dsigma_x dsigma_y v_x v_y
# 8 9 10 11 12 13 14 15
# pitt pitx pity pixx pixy piyy pizz Pi
# pack surface data into a dict suitable for passing to frzout.Surface
return dict(
zip(['x', 'sigma', 'v'], np.hsplit(surface, [3, 6, 8])),
pi=dict(zip(['xx', 'xy', 'yy'], surface.T[11:14])),
Pi=surface.T[15]
)
# species (name, ID) for identified particle observables
species = [
('pion', 211),
('kaon', 321),
('proton', 2212),
('Lambda', 3122),
('Sigma0', 3212),
('Xi', 3312),
('Omega', 3334),
]
# fully specify numeric data types, including endianness and size, to
# ensure consistency across all machines
float_t = '<f8'
int_t = '<i8'
complex_t = '<c16'
# results "array" (one element)
# to be overwritten for each event
results = np.empty((), dtype=[
('initial_entropy', float_t),
('nsamples', int_t),
('dNch_deta', float_t),
('dET_deta', float_t),
('dN_dy', [(s, float_t) for (s, _) in species]),
('mean_pT', [(s, float_t) for (s, _) in species]),
('pT_fluct', [('N', int_t), ('sum_pT', float_t), ('sum_pTsq', float_t)]),
('flow', [('N', int_t), ('Qn', complex_t, 8)]),
])
# UrQMD raw particle format
parts_dtype = [
('sample', int),
('ID', int),
('charge', int),
('pT', float),
('ET', float),
('mT', float),
('phi', float),
('y', float),
('eta', float)
]
def run_single_event(ic, event_number):
"""
Run the initial condition event contained in HDF5 dataset object `ic`
and save observables to `results`.
"""
results.fill(0)
results['initial_entropy'] = ic.sum() * grid_step**2
assert all(n == grid_n for n in ic.shape)
logging.info(
'free streaming initial condition for %.3f fm',
args.tau_fs
)
fs = freestream.FreeStreamer(ic, grid_max, args.tau_fs)
# run coarse event on large grid and determine max radius
rmax = math.sqrt((
run_hydro(fs, event_size=27, coarse=3)['x'][:, 1:3]**2
).sum(axis=1).max())
logging.info('rmax = %.3f fm', rmax)
# now run normal event with size set to the max radius
# and create sampler surface object
surface = frzout.Surface(**run_hydro(fs, event_size=rmax), ymax=2)
logging.info('%d freeze-out cells', len(surface))
minsamples, maxsamples = 10, 1000 # reasonable range for nsamples
minparts = 10**5 # min number of particles to sample
nparts = 0 # for tracking total number of sampled particles
logging.info('sampling surface with frzout')
# sample particles and write to file
with open('particles_in.dat', 'w') as f:
for nsamples in range(1, maxsamples + 1):
parts = frzout.sample(surface, hrg)
if parts.size == 0:
continue
nparts += parts.size
print('#', parts.size, file=f)
for p in parts:
print(p['ID'], *p['x'], *p['p'], file=f)
if nparts >= minparts and nsamples >= minsamples:
break
logging.info('produced %d particles in %d samples', nparts, nsamples)
if nparts == 0:
raise StopEvent('no particles produced')
# try to free some memory
# (up to ~a few hundred MiB for ultracentral collisions)
del surface
results['nsamples'] = nsamples
# hadronic afterburner
run_cmd('afterburner particles_in.dat particles_out.dat')
# read final particle data
with open('particles_out.dat', 'rb') as f:
# partition UrQMD file into oversamples
groups = groupby(f, key=lambda l: l.startswith(b'#'))
samples = filter(lambda g: not g[0], groups)
# iterate over particles and oversamples
parts_iter = (
tuple((nsample, *l.split()))
for nsample, (header, sample) in enumerate(samples, start=1)
for l in sample
)
parts = np.fromiter(parts_iter, dtype=parts_dtype)
# save raw particle data (optional)
if particles_file is not None:
# save event to hdf5 data set
logging.info('saving raw particle data')
particles_file.create_dataset(
'event_{}'.format(event_number),
data=parts, compression='lzf'
)
logging.info('computing observables')
charged = (parts['charge'] != 0)
abs_eta = np.fabs(parts['eta'])
results['dNch_deta'] = \
np.count_nonzero(charged & (abs_eta < .5)) / nsamples
ET_eta = .6
results['dET_deta'] = \
parts['ET'][abs_eta < ET_eta].sum() / (2*ET_eta) / nsamples
abs_ID = np.abs(parts['ID'])
midrapidity = (np.fabs(parts['y']) < .5)
pT = parts['pT']
phi = parts['phi']
for name, i in species:
cut = (abs_ID == i) & midrapidity
N = np.count_nonzero(cut)
results['dN_dy'][name] = N / nsamples
results['mean_pT'][name] = (0. if N == 0 else pT[cut].mean())
pT_alice = pT[charged & (abs_eta < .8) & (.15 < pT) & (pT < 2.)]
results['pT_fluct']['N'] = pT_alice.size
results['pT_fluct']['sum_pT'] = pT_alice.sum()
results['pT_fluct']['sum_pTsq'] = np.inner(pT_alice, pT_alice)
phi_alice = phi[charged & (abs_eta < .8) & (.2 < pT) & (pT < 5.)]
results['flow']['N'] = phi_alice.size
results['flow']['Qn'] = [
np.exp(1j*n*phi_alice).sum()
for n in range(1, results.dtype['flow']['Qn'].shape[0] + 1)
]
nfail = 0
# run each initial condition event and save results to file
for n, ic in enumerate(initial_conditions, start=1):
logging.info('starting event %d', n)
try:
run_single_event(ic, n)
except StopEvent as e:
if particles_file is not None:
particles_file.create_dataset(
'event_{}'.format(n), shape=(0,), dtype=parts_dtype
)
logging.info('event stopped: %s', e)
except Exception:
logging.exception('event %d failed', n)
nfail += 1
if nfail > 3 and nfail/n > .5:
logging.critical('too many failures, stopping events')
break
logging.warning('continuing to next event')
continue
results_file.write(results.tobytes())
logging.info('event %d completed successfully', n)
# end of events: if running with a checkpoint, delete the file unless this
# was a failed re-run of a checkpoint event
if args.checkpoint is not None:
if checkpoint_ic is not None and nfail > 0:
logging.info(
'checkpoint event failed, keeping file %s',
args.checkpoint
)
else:
os.remove(args.checkpoint)
logging.info('removed checkpoint file %s', args.checkpoint)
return n > nfail
def main():
args, checkpoint_ic = parse_args_checkpoint()
if checkpoint_ic is None:
# starting fresh -> truncate output files
filemode = 'w'
# must handle rank first since it affects paths
if args.rankvar:
rank = os.getenv(args.rankvar)
if rank is None:
sys.exit('rank variable {} is not set'.format(args.rankvar))
if args.rankfmt:
rank = args.rankfmt.format(int(rank))
# append rank to path arguments, e.g.:
# /path/to/output.log -> /path/to/output/<rank>.log
for a in ['results', 'logfile', 'particles', 'checkpoint']:
value = getattr(args, a)
if value is not None:
root, ext = os.path.splitext(value)
setattr(args, a, os.path.join(root, rank) + ext)
else:
# running checkpoint event -> append to existing files
filemode = 'a'
os.makedirs(os.path.dirname(args.results), exist_ok=True)
if args.logfile is None:
logfile_kwargs = dict(stream=sys.stdout)
else:
logfile_kwargs = dict(filename=args.logfile, filemode=filemode)
os.makedirs(os.path.dirname(args.logfile), exist_ok=True)
if args.particles is not None:
os.makedirs(os.path.dirname(args.particles), exist_ok=True)
if args.checkpoint is not None:
os.makedirs(os.path.dirname(args.checkpoint), exist_ok=True)
logging.basicConfig(
level=getattr(logging, args.loglevel.upper()),
format='[%(levelname)s@%(relativeCreated)d] %(message)s',
**logfile_kwargs
)
logging.captureWarnings(True)
start = datetime.datetime.now()
if checkpoint_ic is None:
logging.info('started at %s', start)
logging.info('arguments: %r', args)
else:
logging.info(
'restarting from checkpoint file %s at %s',
args.checkpoint, start
)
# translate SIGTERM to KeyboardInterrupt
signal.signal(signal.SIGTERM, signal.default_int_handler)
logging.debug('set SIGTERM handler')
@contextmanager
def h5py_file():
yield h5py.File(args.particles, 'w') if args.particles else None
with \
open(args.results, filemode + 'b',
buffering=args.buffering) as results_file, \
h5py_file() as particles_file, \
tempfile.TemporaryDirectory(
prefix='hic-', dir=args.tmpdir) as workdir:
os.chdir(workdir)
logging.info('working directory: %s', workdir)
try:
status = run_events(args, results_file, particles_file, checkpoint_ic)
except KeyboardInterrupt:
# after catching the initial SIGTERM or interrupt, ignore them
# during shutdown -- this ensures everything will exit gracefully
# in case of additional signals (short of SIGKILL)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
status = True
logging.info(
'interrupt or signal at %s, cleaning up...',
datetime.datetime.now()
)
if args.checkpoint is not None:
logging.info(
'current event saved in checkpoint file %s',
args.checkpoint
)
end = datetime.datetime.now()
logging.info('finished at %s, %s elapsed', end, end - start)
if not status:
sys.exit(1)
if __name__ == "__main__":
main()