Skip to content

Commit

Permalink
Merge pull request #1215 from JosepSampe/lithops-dev
Browse files Browse the repository at this point in the history
[Partitioner] Fix partitioner
  • Loading branch information
JosepSampe authored Dec 1, 2023
2 parents 873fdb2 + f39ac21 commit 06381d5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
- [k8s] Limit the size of the "user" label as the maximum allowed is 63 chars
- [Joblib] Fix shared objects utility when multiple maps run from the same executor
- [Azure VMs] Fix wrong exception when trying to connect to the master VM for the first time

- [Partitioner] Fix partitioner

## [v3.0.1]

Expand Down
2 changes: 1 addition & 1 deletion lithops/scripts/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from lithops.storage import Storage
from lithops.storage.utils import clean_bucket
from lithops.constants import JOBS_PREFIX, TEMP_PREFIX, CLEANER_DIR,\
from lithops.constants import JOBS_PREFIX, TEMP_PREFIX, CLEANER_DIR, \
CLEANER_PID_FILE, CLEANER_LOG_FILE

log_file_stream = open(CLEANER_LOG_FILE, 'a')
Expand Down
6 changes: 6 additions & 0 deletions lithops/tests/util_func/map_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def my_map_function_obj(obj, id):
counter = {}
data = obj.data_stream.read()

# chunk = obj.data_stream.read(10000)
# data = b""
# while chunk:
# data += chunk
# chunk = obj.data_stream.read(10000)

print('Data lenght: {}'.format(len(data)))

for line in data.splitlines():
Expand Down
12 changes: 6 additions & 6 deletions lithops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ def read(self, n=None):
self._first_byte = self.sb.read(self._plusbytes)

retval = self.sb.read(n)

self.pos += len(retval)
last_row_end_pos = len(retval)
self.pos += last_row_end_pos
first_row_start_pos = 0

if self._first_read and self._first_byte and \
Expand All @@ -680,11 +680,11 @@ def read(self, n=None):
first_row_start_pos = retval.find(self.newline_char) + 1
self._first_read = False

last_row_end_pos = self.pos
# Find end of the line in threshold
if self.pos > self.size:
last_byte_pos = retval[self.size - 1:].find(self.newline_char)
last_row_end_pos = self.size + last_byte_pos
if self.pos >= self.size:
current_end_pos = last_row_end_pos - (self.pos - self.size)
last_byte_pos = retval[current_end_pos - 1:].find(self.newline_char)
last_row_end_pos = current_end_pos + last_byte_pos
self._eof = True

return retval[first_row_start_pos:last_row_end_pos]
Expand Down

0 comments on commit 06381d5

Please sign in to comment.