Skip to content

Commit

Permalink
Remove select IDs optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
dreadatour committed Sep 5, 2024
1 parent ca256b2 commit 44b6d0e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 19 deletions.
16 changes: 3 additions & 13 deletions src/datachain/query/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
process_udf_outputs,
)
from datachain.query.queue import (
EMPTY_SIGNAL,
get_from_queue,
msgpack_pack,
msgpack_unpack,
Expand Down Expand Up @@ -408,16 +407,7 @@ def get_inputs(self):
ids = [row[0] for row in batch.rows]
rows = warehouse.dataset_rows_select(self.query.where(col_id.in_(ids)))
yield RowsOutputBatch(list(rows))
return

ids = []
while (batch := get_from_queue(self.task_queue, timeout=1)) != STOP_SIGNAL:
if batch != EMPTY_SIGNAL:
ids.append(batch[0])
if len(ids) >= 1000 or (ids and batch == EMPTY_SIGNAL):
rows = warehouse.dataset_rows_select(self.query.where(col_id.in_(ids)))
else:
while (row := get_from_queue(self.task_queue)) != STOP_SIGNAL:
rows = warehouse.dataset_rows_select(self.query.where(col_id == row[0]))
yield from rows
ids.clear()
if ids:
rows = warehouse.dataset_rows_select(self.query.where(col_id.in_(ids)))
yield from rows
7 changes: 1 addition & 6 deletions src/datachain/query/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import time
from queue import Empty, Full, Queue
from struct import pack, unpack
from time import sleep
Expand All @@ -11,7 +10,6 @@

DEFAULT_BATCH_SIZE = 10000
STOP_SIGNAL = "STOP"
EMPTY_SIGNAL = "EMPTY"
OK_STATUS = "OK"
FINISHED_STATUS = "FINISHED"
FAILED_STATUS = "FAILED"
Expand All @@ -27,20 +25,17 @@
# https://github.com/python/cpython/issues/108645


def get_from_queue(queue: Queue, timeout: float = 0) -> Any:
def get_from_queue(queue: Queue) -> Any:
"""
Gets an item from a queue.
This is required to handle signals, such as KeyboardInterrupt exceptions
while waiting for items to be available, although only on certain installations.
(See the above comment for more context.)
"""
started_at = time.time()
while True:
try:
return queue.get_nowait()
except Empty:
if 0 < timeout <= time.time() - started_at:
return EMPTY_SIGNAL
sleep(0.01)


Expand Down

0 comments on commit 44b6d0e

Please sign in to comment.