From f39ac21f41939b22770e4dfc60d02942d051b8b7 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Fri, 1 Dec 2023 20:35:57 +0100 Subject: [PATCH] [Partitioner] Fix partitioner --- CHANGELOG.md | 2 +- lithops/scripts/cleaner.py | 2 +- lithops/tests/util_func/map_util.py | 6 ++++++ lithops/utils.py | 12 ++++++------ 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25b9ede56..9a65d0a26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,7 +35,7 @@ - [k8s] Limit the size of the "user" label as the maximum allowed is 63 chars - [Joblib] Fix shared objects utility when multiple maps run from the same executor - [Azure VMs] Fix wrong exception when trying to connect to the master VM for the first time - +- [Partitioner] Fix partitioner ## [v3.0.1] diff --git a/lithops/scripts/cleaner.py b/lithops/scripts/cleaner.py index 1ba977d78..43c06bd0a 100644 --- a/lithops/scripts/cleaner.py +++ b/lithops/scripts/cleaner.py @@ -23,7 +23,7 @@ from lithops.storage import Storage from lithops.storage.utils import clean_bucket -from lithops.constants import JOBS_PREFIX, TEMP_PREFIX, CLEANER_DIR,\ +from lithops.constants import JOBS_PREFIX, TEMP_PREFIX, CLEANER_DIR, \ CLEANER_PID_FILE, CLEANER_LOG_FILE log_file_stream = open(CLEANER_LOG_FILE, 'a') diff --git a/lithops/tests/util_func/map_util.py b/lithops/tests/util_func/map_util.py index acb6ae3d3..45b19be00 100644 --- a/lithops/tests/util_func/map_util.py +++ b/lithops/tests/util_func/map_util.py @@ -64,6 +64,12 @@ def my_map_function_obj(obj, id): counter = {} data = obj.data_stream.read() + # chunk = obj.data_stream.read(10000) + # data = b"" + # while chunk: + # data += chunk + # chunk = obj.data_stream.read(10000) + print('Data lenght: {}'.format(len(data))) for line in data.splitlines(): diff --git a/lithops/utils.py b/lithops/utils.py index f208d6730..cbcc1eafb 100644 --- a/lithops/utils.py +++ b/lithops/utils.py @@ -668,8 +668,8 @@ def read(self, n=None): self._first_byte = self.sb.read(self._plusbytes) retval = self.sb.read(n) - - self.pos += len(retval) + last_row_end_pos = len(retval) + self.pos += last_row_end_pos first_row_start_pos = 0 if self._first_read and self._first_byte and \ @@ -680,11 +680,11 @@ def read(self, n=None): first_row_start_pos = retval.find(self.newline_char) + 1 self._first_read = False - last_row_end_pos = self.pos # Find end of the line in threshold - if self.pos > self.size: - last_byte_pos = retval[self.size - 1:].find(self.newline_char) - last_row_end_pos = self.size + last_byte_pos + if self.pos >= self.size: + current_end_pos = last_row_end_pos - (self.pos - self.size) + last_byte_pos = retval[current_end_pos - 1:].find(self.newline_char) + last_row_end_pos = current_end_pos + last_byte_pos self._eof = True return retval[first_row_start_pos:last_row_end_pos]