-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix reconstitute interleaving features #886
base: main
Are you sure you want to change the base?
Changes from all commits
3170aed
26deca1
3a3f1b1
2090e55
ea99ef9
2fd7546
b6ecdfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
from hera_cal.data import DATA_PATH | ||
from hera_cal import frf | ||
import glob | ||
import re | ||
import copy | ||
|
||
|
||
|
@@ -1130,46 +1131,53 @@ def test_time_chunk_from_baseline_chunks_argparser(self): | |
assert a.time_chunk_template == 'a' | ||
assert a.outfilename == 'a.out' | ||
|
||
def test_time_chunk_from_baseline_chunks(self, tmp_path): | ||
@pytest.mark.parametrize("ninterleave", [1, 2, 4, 8]) | ||
def test_time_chunk_from_baseline_chunks(self, tmpdir, ninterleave): | ||
# First, construct some cross-talk baseline files. | ||
datafiles = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.first.uvh5"), | ||
os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.second.uvh5")] | ||
|
||
cals = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part1"), | ||
os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part2")] | ||
# make a cache directory | ||
cdir = tmp_path / "cache_temp" | ||
cdir.mkdir() | ||
tmp_path = tmpdir.strpath | ||
cdir = tmp_path + "/cache_temp" | ||
os.mkdir(cdir) | ||
# cross-talk filter chunked baselines | ||
for filenum, file in enumerate(datafiles): | ||
baselines = io.baselines_from_filelist_position(file, datafiles) | ||
fname = 'temp.fragment.part.%d.h5' % filenum | ||
fragment_filename = tmp_path / fname | ||
frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals, | ||
spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True, | ||
res_outfilename=fragment_filename, clobber=True, case='sky') | ||
for interleave in range(ninterleave): | ||
baselines = io.baselines_from_filelist_position(file, datafiles) | ||
if ninterleave > 1: | ||
fname = f'temp.fragment.part.%d.interleave_{interleave}.h5' % filenum | ||
else: | ||
fname = f'temp.fragment.part.%d.h5' % filenum | ||
fragment_filename = tmp_path + fname | ||
frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals, | ||
spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True, | ||
res_outfilename=fragment_filename, clobber=True, case='sky') | ||
# load in fragment and make sure the number of baselines is equal to the length of the baseline list | ||
hd_fragment = io.HERAData(str(fragment_filename)) | ||
assert len(hd_fragment.bls) == len(baselines) | ||
assert hd_fragment.Ntimes == 60 | ||
assert hd_fragment.Nfreqs == 20 | ||
|
||
fragments = glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*') | ||
fragments = sorted(glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*')) | ||
# reconstitute the filtered data | ||
for filenum, file in enumerate(datafiles): | ||
# reconstitute | ||
# AEW -- 5-10-2023 -- I AM HERE! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know! |
||
fname = 'temp.reconstituted.part.%d.h5' % filenum | ||
vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file, | ||
baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True, | ||
outfilename=str(tmp_path / fname)) | ||
baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True, | ||
outfilename=str(tmp_path + fname)) | ||
# load in the reconstituted files. | ||
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5'))) | ||
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5'))) | ||
hd_reconstituted.read() | ||
# compare to xtalk filtering the whole file. | ||
frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"), | ||
calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"), | ||
res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') | ||
hd = io.HERAData(str(tmp_path / 'temp.h5')) | ||
res_outfilename=str(tmp_path + 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') | ||
hd = io.HERAData(str(tmp_path + 'temp.h5')) | ||
hd.read() | ||
assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array)) | ||
assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array)) | ||
|
@@ -1179,23 +1187,25 @@ def test_time_chunk_from_baseline_chunks(self, tmp_path): | |
# reconstitute | ||
fname = 'temp.reconstituted.part.%d.h5' % filenum | ||
vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file, | ||
baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True, | ||
outfilename=str(tmp_path / fname), time_bounds=True) | ||
baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True, | ||
outfilename=str(tmp_path + fname), time_bounds=True) | ||
# load in the reconstituted files. | ||
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5'))) | ||
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5'))) | ||
hd_reconstituted.read() | ||
# compare to xtalk filtering the whole file. | ||
frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"), | ||
calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"), | ||
res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') | ||
hd = io.HERAData(str(tmp_path / 'temp.h5')) | ||
res_outfilename=str(tmp_path + 'temp.h5'), | ||
clobber=True, spw_range=[0, 20], case='sky') | ||
hd = io.HERAData(str(tmp_path + 'temp.h5')) | ||
hd.read() | ||
assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array)) | ||
assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array)) | ||
assert np.all(np.isclose(hd.nsample_array, hd_reconstituted.nsample_array)) | ||
# check warning. | ||
with pytest.warns(RuntimeWarning): | ||
vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:], clobber=True, outfilename=str(tmp_path / fname), time_bounds=True) | ||
vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:], | ||
clobber=True, outfilename=str(tmp_path + fname), time_bounds=True) | ||
|
||
def test_discard_autocorr_imag(): | ||
hd = io.HERAData(os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5")) | ||
|
@@ -1217,5 +1227,3 @@ def test_discard_autocorr_imag(): | |
assert data1[first_key].dtype == np.complex64 | ||
assert np.all(data1[first_key].imag == 0) | ||
assert np.all(data1[first_key] == data0[first_key]) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
from .datacontainer import DataContainer | ||
from .utils import echo | ||
from .flag_utils import factorize_flags | ||
|
||
import re | ||
|
||
|
||
def discard_autocorr_imag(data_container, keys=None): | ||
|
@@ -794,8 +794,7 @@ def vis_clean(self, keys=None, x=None, data=None, flags=None, wgts=None, | |
# get filter properties | ||
mfrate = max_frate[k] if max_frate is not None else None | ||
filter_centers, filter_half_widths = gen_filter_properties(ax=ax, horizon=horizon, | ||
standoff=standoff, min_dly=min_dly, | ||
bl_len=self.bllens[k[:2]], max_frate=mfrate) | ||
standoff=standoff, min_dly=min_dly, bl_len=self.bllens[k[:2]], max_frate=mfrate) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't tell what's happening on this line, but it seems weird |
||
if mode != 'clean': | ||
suppression_factors = [[tol], [tol]] if ax == 'both' else [tol] | ||
self.fourier_filter(filter_centers=filter_centers, filter_half_widths=filter_half_widths, | ||
|
@@ -2023,7 +2022,9 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o | |
output will trim the extra frequencies in the time_chunk and write out trimmed freqs. The same is true | ||
for polarizations. | ||
baseline_chunk_files : list of strings | ||
list of paths to baseline-chunk files to select time-chunk file from. | ||
list of paths to baseline-chunk files to select time-chunk file from. If the files have "_interleave" in their title then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this any concern that this will be triggered accidentally? Do we want to have some kind of boolean trigger (default false) for this behavior or do you think that's too rare of an edge case to be worried about? |
||
the method will automatically identify the number of unique interleaves, chunk the file list up into interleaved sets and | ||
retrieve integrations based on the time in the first file. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For me, just having read this docstring, I'm still not clear on exactly what's happening here. Can you maybe provide a more clear example in the docstring? |
||
outfilename : string | ||
name of the output file to write. | ||
clobber : bool optional. | ||
|
@@ -2040,11 +2041,38 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o | |
------- | ||
Nothing | ||
""" | ||
# check whether "interleave" is in baseline chunk filenames. If so, make sure that they all have "interleave", | ||
# split them into sets, and make sure that the sets all have the same number of files. | ||
interleave_mode = "interleave" in baseline_chunk_files[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not consistent with the docstring, which states that if "_interleave" is in the filename, it'll do the interleave mode |
||
|
||
for fname in baseline_chunk_files: | ||
if "interleave" not in fname and interleave_mode or \ | ||
"interleave" in fname and not interleave_mode: | ||
raise ValueError("must not have a subset of files with 'interleave' in name.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a nicer error here would be "Cannot have some baseline_chunk_files with "_interleave" in their name, while other's don't" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Furthermore, add this info to the docstring (i.e. say that all or none of the files must have |
||
if interleave_mode: | ||
interleave_indices = np.unique([int(re.findall("interleave_[0-9]{1,10}", fname)[0][11:]) for fname in baseline_chunk_files]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels rather hard-coded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At minimum, the precise expectation for the file name format should be documented There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I actually think a much more flexible and future-proof way of doing this would be to have a new argument to the function, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels like a function that should probably be broken out for future use. |
||
ninterleave = len(interleave_indices) | ||
interleave_sets = {} | ||
interleave_index_dict = {} | ||
for inum, interleave in enumerate(interleave_indices): | ||
interleave_sets[inum] = sorted([fname for fname in baseline_chunk_files if f'interleave_{inum}' in fname]) | ||
interleave_index_dict[inum] = interleave | ||
# check that all interleave sets have the same length. | ||
if len(np.unique([len(interleave_sets[k]) for k in interleave_sets])) > 1: | ||
raise ValueError("If you are providing interleaved files, each interleaved set must have the same number of files in it!") | ||
else: | ||
ninterleave = 1 | ||
interleave_sets = {0: baseline_chunk_files} | ||
interleave_index_dict = {0: 0} | ||
|
||
hd_time_chunk = io.HERAData(time_chunk_template) | ||
hd_baseline_chunk = io.HERAData(baseline_chunk_files[0]) | ||
times = hd_time_chunk.times | ||
hd_baseline_chunk = io.HERAData(baseline_chunk_files[0]) | ||
freqs = hd_baseline_chunk.freqs | ||
polarizations = hd_baseline_chunk.pols | ||
|
||
|
||
|
||
# read in the template file, but only include polarizations, frequencies | ||
# from the baseline_chunk_file files. | ||
if not time_bounds: | ||
|
@@ -2056,34 +2084,56 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o | |
# for each baseline_chunk_file, read in only the times relevant to the templatefile. | ||
# and update the data, flags, nsamples array of the template file | ||
# with the baseline_chunk_file data. | ||
for baseline_chunk_file in baseline_chunk_files: | ||
hd_baseline_chunk = io.HERAData(baseline_chunk_file) | ||
# find times that are close. | ||
tload = [] | ||
# use tolerance in times that is set by the time resolution of the dataset. | ||
atol = np.median(np.diff(hd_baseline_chunk.times)) / 10. | ||
all_times = np.unique(hd_baseline_chunk.times) | ||
for t in all_times: | ||
if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)): | ||
tload.append(t) | ||
d, f, n = hd_baseline_chunk.read(times=tload, axis='blt') | ||
hd_time_chunk.update(flags=f, data=d, nsamples=n) | ||
# now that we've updated everything, we write the output file. | ||
hd_time_chunk.write_uvh5(outfilename, clobber=clobber) | ||
for inum in interleave_sets: | ||
for baseline_chunk_file in interleave_sets[inum]: | ||
hd_baseline_chunk = io.HERAData(baseline_chunk_file) | ||
# find times that are close. | ||
tload = [] | ||
# use tolerance in times that is set by the time resolution of the dataset. | ||
atol = np.median(np.diff(hd_baseline_chunk.times)) / 10. | ||
all_times = np.unique(hd_baseline_chunk.times) | ||
for t in all_times: | ||
if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)): | ||
tload.append(t) | ||
d, f, n = hd_baseline_chunk.read(times=tload, axis='blt') | ||
hd_time_chunk.update(flags=f, data=d, nsamples=n) | ||
# now that we've updated everything, we write the output file. | ||
if ninterleave > 1: | ||
outfilename_i = outfilename.replace('.uvh5', '.interleave_{}.uvh5') | ||
else: | ||
outfilename_i = outfilename | ||
|
||
hd_time_chunk.write_uvh5(outfilename_i, clobber=clobber) | ||
# reinstantiate writer file. | ||
hd_time_chunk = io.HERAData(time_chunk_template) | ||
hd_time_chunk.read(times=times, frequencies=freqs, polarizations=polarizations) | ||
|
||
else: | ||
dt_time_chunk = np.median(np.diff(hd_time_chunk.times)) / 2. | ||
tmax = hd_time_chunk.times.max() + dt_time_chunk | ||
tmin = hd_time_chunk.times.min() - dt_time_chunk | ||
hd_combined = io.HERAData(baseline_chunk_files) | ||
t_select0 = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax) | ||
# use same time selection for all interleaves even though the times don't line | ||
# up. We need the indices too. | ||
# if the interleaves have different Ntimes, | ||
# we will only use the Ntimes from the first interleave. | ||
t_select = t_select0[:hd_baseline_chunk.Ntimes] | ||
# we only compare centers of baseline files to time limits of time-file. | ||
# this is to prevent integrations that straddle file boundaries from being dropped. | ||
# when we perform reconstitution. | ||
t_select = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax) | ||
if np.any(t_select): | ||
hd_combined.read(times=hd_baseline_chunk.times[t_select], axis='blt') | ||
hd_combined.write_uvh5(outfilename, clobber=clobber) | ||
else: | ||
warnings.warn("no times selected. No outputfile produced.", RuntimeWarning) | ||
|
||
for inum in interleave_sets: | ||
hd_baseline_chunk_iset = io.HERAData(interleave_sets[inum][0]) | ||
hd_combined = io.HERAData(interleave_sets[inum]) | ||
if np.any(t_select): | ||
hd_combined.read(times=hd_baseline_chunk_iset.times[t_select], axis='blt') | ||
if ninterleave > 1: | ||
outfilename_interleave = outfilename.replace('.uvh5', f'.interleave_{interleave_index_dict[inum]}.uvh5') | ||
else: | ||
outfilename_interleave = outfilename | ||
hd_combined.write_uvh5(outfilename_interleave, clobber=clobber) | ||
else: | ||
warnings.warn("no times selected. No outputfile produced.", RuntimeWarning) | ||
|
||
|
||
def time_chunk_from_baseline_chunks_argparser(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In new code, I'd heartily suggest using the
pathlib
module for path operations. You can use the pytest fixturetmp_path_factory
, then docdir = tmp_path_factory.mkdir('test_time_chunk_from_baseline_chunks')
.