Skip to content

Commit

Permalink
Remove async timeout, wrap async tests directly to avoid reliance on …
Browse files Browse the repository at this point in the history
…flaky subpackages
  • Loading branch information
timkpaine committed Aug 9, 2023
1 parent 8d4a5c5 commit dc02bdf
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,26 @@ async def websocket_client(self, port):
client = await websocket("ws://127.0.0.1:{}/websocket".format(port))
return client

@pytest.mark.gen_test(run_sync=False)
async def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
table_name = str(random.random())
_table = Table(data)
MANAGER.host_table(table_name, _table)

client = await self.websocket_client(http_port)
table = client.open_table(table_name)
view = await table.view()
reqs = []
for x in range(10):
reqs.append(table.update(data))
reqs.append(view.to_arrow())

await asyncio.gather(*reqs)
# views = await asyncio.gather(*[table.view() for _ in range(5)])
# outputs = await asyncio.gather(*[view.to_arrow() for view in views])
expected = await table.schema()
records = await view.to_records()

assert len(records) == 110
def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
async def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
table_name = str(random.random())
_table = Table(data)
MANAGER.host_table(table_name, _table)

client = await self.websocket_client(http_port)
table = client.open_table(table_name)
view = await table.view()
reqs = []
for x in range(10):
reqs.append(table.update(data))
reqs.append(view.to_arrow())

await asyncio.gather(*reqs)
# views = await asyncio.gather(*[table.view() for _ in range(5)])
# outputs = await asyncio.gather(*[view.to_arrow() for view in views])
expected = await table.schema()
records = await view.to_records()

assert len(records) == 110

asyncio.get_event_loop().run_until_complete(test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel))
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,37 @@ async def websocket_client(self, port):
client = await websocket("ws://127.0.0.1:{}/websocket".format(port))
return client

@pytest.mark.gen_test(run_sync=False, timeout=30)
async def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
global data
table_name = str(random.random())
_table = Table(data)
data = _table.view().to_arrow()
MANAGER.host_table(table_name, _table)
assert _table.size() == 100

client = await self.websocket_client(http_port)
table = client.open_table(table_name)
view = await table.view()
reqs = []
for x in range(10):
reqs.append(table.update(data))
reqs.append(view.to_arrow())

await asyncio.gather(*reqs)
# await asyncio.sleep(5)

# In single-threaded execution, "read" methods like `to_arrow` flush the
# pending updates queue, so that `to_arrow()` following an `update()`
# always reflects the update. In multi-threaded execution, this is much
# harder to guarantee. In practice for the Perspective, it's only useful
# for tests anyway, so in this case, it is enough to test that the
# result is eventually correct
record_size = await table.size()
while record_size != 1100:
def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
async def test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel):
global data
table_name = str(random.random())
_table = Table(data)
data = _table.view().to_arrow()
MANAGER.host_table(table_name, _table)
assert _table.size() == 100

client = await self.websocket_client(http_port)
table = client.open_table(table_name)
view = await table.view()
reqs = []
for x in range(10):
reqs.append(table.update(data))
reqs.append(view.to_arrow())

await asyncio.gather(*reqs)
# await asyncio.sleep(5)

# In single-threaded execution, "read" methods like `to_arrow` flush the
# pending updates queue, so that `to_arrow()` following an `update()`
# always reflects the update. In multi-threaded execution, this is much
# harder to guarantee. In practice for the Perspective, it's only useful
# for tests anyway, so in this case, it is enough to test that the
# result is eventually correct
record_size = await table.size()
while record_size != 1100:
record_size = await table.size()

records = await view.to_records()
assert len(records) == 1100
records = await view.to_records()
assert len(records) == 1100

asyncio.get_event_loop().run_until_complete(test_tornado_handler_async_manager_thread(self, app, http_client, http_port, sentinel))

0 comments on commit dc02bdf

Please sign in to comment.