Skip to content

Commit

Permalink
Merge pull request #138 from zpz/zepu
Browse files Browse the repository at this point in the history
Zepu
  • Loading branch information
zpz authored Sep 30, 2024
2 parents 8d0024f + 8af0053 commit cc02fb8
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 52 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).


## [0.9.5] - 2024-09-29

- Finetune `Biglist.flush`, fixing some false alarms about "did you forget to call `flush`?".


## [0.9.4] - 2024-08-22

- Optimization to `Biglist.flush()` by reducing the number of temporary bookkeeping files.
Expand All @@ -17,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.9.2] - 2024-07-15

- Do not trying flushing or checking updates if the Biglist object has been read-only.
- Do not try flushing or checking updates if the Biglist object has been read-only.


## [0.9.1] - 2024-06-20
Expand All @@ -27,7 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.9.0] - 2024-06-18

- Removed parameter `keep_files`. `__del__` no longer calls `destroy`; instead, it always try to call `flush`.
- Removed parameter `keep_files`. `__del__` no longer calls `destroy`; instead, it always tries to call `flush`.
User must explicitly call `destroy` if so desired.
- `Biglist.flush` gets new parameter `eager`, default `False`.

Expand Down
2 changes: 1 addition & 1 deletion src/biglist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@
Slicer,
)

__version__ = '0.9.4'
__version__ = '0.9.5'
83 changes: 43 additions & 40 deletions src/biglist/_biglist.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,11 @@ def __init__(
with self._info_file.lock() as ff:
ff.write_json(self.info, overwrite=True)

self._flushed = True
self._info_backup = copy.deepcopy(self.info)

def __del__(self) -> None:
if self._info_file.is_file(): # otherwise `destroy()` may have been called
if self._warn_flush():
if self._warn_flush('__del__'):
self.flush()

@property
Expand All @@ -817,11 +816,10 @@ def storage_version(self) -> int:
"""The internal format used in persistence. This is a read-only attribute for information only."""
return self.info.get('storage_version', 0)

def _warn_flush(self):
def _warn_flush(self, source: str):
if (
self._append_buffer
or self._append_files_buffer
or not self._flushed
or self._info_backup != self.info
):
# This warning fires if changed made by this object is not yet
Expand All @@ -831,7 +829,7 @@ def _warn_flush(self):
# Unless you know what you are doing, don't use `flush(eager=True)`.

warnings.warn(
f"did you forget to flush {self.__class__.__name__} at '{self.path}'?"
f"did you forget to flush {self.__class__.__name__} at '{self.path}' (about to call `{source}`)?"
)
return True
return False
Expand All @@ -848,7 +846,7 @@ def __len__(self) -> int:
.. versionchanged:: 0.7.4
In previous versions, this count includes items that are not yet flushed.
"""
self._warn_flush()
self._warn_flush('__len__')
return super().__len__()

def __getitem__(self, idx: int) -> Element:
Expand All @@ -873,7 +871,7 @@ def __iter__(self) -> Iterator[Element]:
.. versionchanged:: 0.7.4
In previous versions, this iteration includes those items that are not yet flushed.
"""
self._warn_flush()
self._warn_flush('__iter__')
return super().__iter__()

def append(self, x: Element) -> None:
Expand Down Expand Up @@ -918,14 +916,15 @@ def worker(datapath: str, worker_id: str, ...):
"""
if extra:
extra = extra.lstrip('_').rstrip('_') + '_'
return f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S.%f')}_{extra}{str(uuid4()).replace('-', '')[:10]}_{buffer_len}"
return f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S.%f')}_{extra}{str(uuid4()).replace('-', '')[:16]}_{buffer_len}"
# File name pattern introduced on 7/25/2022.
# This should guarantee the file name is unique, hence
# we do not need to verify that this file name is not already used.
# Also include timestamp and item count in the file name, in case
# later we decide to use these pieces of info.
# Changes in 0.7.4: the time part changes from epoch to datetime, with guaranteed fixed length.
# Change in 0.8.4: the uuid part has dash removed and length reduced to 10; add ``extra``.
# Change in 0.9.5: keep 16 digits of the uuid4 str, instead of the previous 10.

def _flush(self) -> None:
"""
Expand All @@ -941,7 +940,6 @@ def _flush(self) -> None:
buffer = self._append_buffer
buffer_len = len(buffer)
self._append_buffer = []
self._flushed = False

datafile_ext = self.storage_format.replace('-', '_')
filename = f'{self.make_file_name(buffer_len)}.{datafile_ext}'
Expand Down Expand Up @@ -985,7 +983,7 @@ def flush(
each object has its own append buffer and does `_flush` independent of other objects.
A data file has a random name (comprised of datetime accurate to sub-seconds,
plus a random string, plus other things);
there is no risk of name clash when multiple Biglist objects save data files independent of
there is essentially no risk of name clash when multiple Biglist objects save data files independent of
each other.
However, there are two things that the automatic `_flush` does not do:
Expand Down Expand Up @@ -1015,18 +1013,15 @@ def flush(
of name clash between multiple writers. In sum, `flush(eager=True)` persists all data and info
but puts the data structure in an "interim" state. Importantly, this op does *not*
involve locking, because it does not update the meta info file.
The parameter `eager` is provided to manage the lock overhead when we write to a
cloud-persisted biglist using many concurrent, distributed writers.
A call to `flush()` (i.e., `flush(eager=False)`) does all that `flush(eager=True)` does, plus
it integrates the content of all interim files, if any, into the meta info file,
and deletes the interim files. This op does lock the info file.
The parameter `eager` is provided to manage the lock overhead when we write to a
cloud-persisted biglist using many concurrent, distributed writers.
If `flush(eager=True)` has been used, then `flush()`
needs to be called at least once before *reading* the data.
Multiple interim files may have been created by multiple writers.
One call to `flush()` will take care of all the interim files in existence.
This call can be made from any Biglist object as long as it points to same path.
This call can be made from any Biglist object as long as it points to the same path.
Unless you know what you are doing, don't use `flush(eager=True)`.
Expand All @@ -1047,9 +1042,7 @@ def flush(
This is a legitimate case in parallel or distributed writing, or writing in
multiple sessions.
Note that `flush` is called automatically when a Biglist object that has not been read-only
is garbage collected.
However, user is strongly recommended to explicitly call `flush` at the end of their writing session.
User is strongly recommended to explicitly call `flush` at the end of their writing session.
(See :meth:`_warn_flush`.)
On the other hand, you should **not** call `flush` frequently "just to be safe".
Expand Down Expand Up @@ -1078,10 +1071,15 @@ def flush(
# appends by other workers. The last call to ``flush`` across all workers
# will get the final meta info right.

data = []
def _merge_data_file_info(info, additions):
z = sorted(set((*(tuple(v[:2]) for v in info), *map(tuple, additions))))
# TODO: maybe a merge sort can be more efficient.
cum = list(itertools.accumulate(v[1] for v in z))
z = [(a, b, c) for (a, b), c in zip(z, cum)]
return z

if self._append_files_buffer:
if eager:
if eager:
if self._append_files_buffer:
# Saving file meta data without merging it into `info.json`.
# This puts the on-disk data structure in a transitional state.
filename = getattr(self, '_flush_eager_file', None)
Expand Down Expand Up @@ -1110,18 +1108,25 @@ def flush(
# the file may not exist. Another object for the same biglist could have
# called `flush`, which would have incorporated all these files into meta info
# and deleted these files.
self._flushed = False
else:
# Do not update this object's eager file, which contains info of files written by this object
# previously (not including the content of `self._append_files_buffer`).
# Take care of `self._append_files_buffer` directly in the meta info file.
data.extend(self._append_files_buffer)
self._append_files_buffer.clear()

if eager:
self.info['data_files_info'] = _merge_data_file_info(
self.info['data_files_info'], self._append_files_buffer
)
# Update the info to reflect the data writings by this object.
self._append_files_buffer.clear()
self._info_backup['data_files_info'] = copy.deepcopy(
self.info['data_files_info']
)
return

# Merge file meta data into `info.json`, finalizing the on-disk data structure.
data = []
if self._append_files_buffer:
# Do not update this object's eager file, which contains info of files written by this object
# previously (not including the content of `self._append_files_buffer`).
# Take care of `self._append_files_buffer` directly in the meta info file.
data.extend(self._append_files_buffer)
self._append_files_buffer.clear()

# Merge data-file meta data into `info.json`, finalizing the persisted data structure.
with self._info_file.lock(timeout=lock_timeout) as ff:
# The info file may have been updated by another object for the same biglist.
self.info.update(ff.read_json())
Expand All @@ -1131,15 +1136,12 @@ def flush(
data.extend(z)
f.remove_file()
if data:
z0 = self.info['data_files_info']
z = sorted(set((*(tuple(v[:2]) for v in z0), *map(tuple, data))))
# TODO: maybe a merge sort can be more efficient.
cum = list(itertools.accumulate(v[1] for v in z))
z = [(a, b, c) for (a, b), c in zip(z, cum)]
self.info['data_files_info'] = z
self.info['data_files_info'] = _merge_data_file_info(
self.info['data_files_info'],
data,
)
ff.write_json(self.info, overwrite=True)
self._info_backup = copy.deepcopy(self.info)
self._flushed = True

def reload(self) -> None:
"""
Expand All @@ -1155,12 +1157,13 @@ def reload(self) -> None:
Creating a new object pointing to the same storage location would achieve the same effect.
"""
self.info = self._info_file.read_json()
self._info_backup = copy.deepcopy(self.info)

@property
def files(self):
# This method should be cheap to call.
# TODO: call `reload`?
self._warn_flush()
self._warn_flush('files')
serde = self.registered_storage_formats[self.storage_format]
fun = serde.load
if self._deserialize_kwargs:
Expand Down
12 changes: 3 additions & 9 deletions tests/test_biglist.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,11 @@ def test_parquet():
bl.extend(data)
bl.flush()

print('')
print(data[:3])
print('')
print('')
print(Slicer(bl)[:3].collect())
print('')

assert list(bl) == data

print('len:', len(bl))
assert len(bl) == len(data)
print('num_data_files:', bl.num_data_files)

Expand Down Expand Up @@ -680,11 +675,10 @@ def test_eager_flush():
for w in workers:
w.join()

assert len(list((bl.path / '_flush_eager').iterdir())) == 0
# These files have been deleted when `flush` (eager=False) was called
# when the Biglist objects in the worker processes were garbage collected.
assert len(list((bl.path / '_flush_eager').iterdir())) == 4
assert not bl
bl.reload()
bl.flush()
assert len(list((bl.path / '_flush_eager').iterdir())) == 0
assert len(bl) == 400
assert sorted(bl) == list(range(400))
finally:
Expand Down

0 comments on commit cc02fb8

Please sign in to comment.