Skip to content

Commit

Permalink
Add integration teet for multiprocessing usage
Browse files Browse the repository at this point in the history
in which the setup spawns a child process that accepts a multiprocessing
Connection and Lock which are used in each prediction. The intent of
this test is to ensure Cog plays nicely with any predictor that uses
these features of multiprocessing, especially in combination with a Path
output, since these have been common elements of known failures.
  • Loading branch information
meatballhat committed Nov 6, 2024
1 parent 4de7f61 commit d6a9946
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.tmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import multiprocessing.connection
import multiprocessing.synchronize
import os
import time


def ponger(
conn: multiprocessing.connection.Connection, lock: multiprocessing.synchronize.Lock
):
for i in range(100):
print(f"Getting ready for some serious ponginggg ({i+1}%)")
time.sleep(0.001 + (0.001 * (i + 1)))

print("ITS PONGIN TIME")

pid = os.getpid()

while True:
try:
ping = conn.recv()
print(f"received {ping} in {pid}")

with lock:
print(f"ponging from {pid}")

conn.send("pong")
conn.close()

except EOFError:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build:
python_version: "3.8"
predict: "predict.py:Predictor"
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import atexit
import multiprocessing
import pathlib
import signal
import subprocess
import sys
import time

from cog.types import Path
from cog import BasePredictor

from bg import ponger

def cleanup():
for tmp in pathlib.Path("./").glob("*.tmp"):
tmp.unlink(missing_ok=True)


atexit.register(cleanup)


class Predictor(BasePredictor):
"""
This predictor checks the case where a process is spawned during setup via
multiprocessing and then each prediction causes that process to write to stdout.
"""

def setup(self) -> None:
print("---> starting background process")

cleanup()

self.parent_conn, self.child_conn = multiprocessing.Pipe()
self.lock = multiprocessing.Lock()
self.bg = multiprocessing.Process(
target=ponger, args=(self.child_conn, self.lock)
)
self.bg.start()

print(f"---> started background process pid={self.bg.pid}")

def predict(self, s: str) -> Path:
if self.bg.is_alive():
print(f"---> sending ping to background job pid={self.bg.pid}")

self.child_conn.send("ping")

print(f"---> sent ping to background job pid={self.bg.pid}")

pong = self.parent_conn.recv()

print(f"---> received {pong} from background job pid={self.bg.pid}")
else:
print(f"---> background job died status={status}")

raise SystemExit

out = Path(f"cog-test-integration-out.{time.time_ns()}.tmp")
out.write_text("hello " + s)

print(f"---> wrote output file {out}")

return out
1 change: 1 addition & 0 deletions test-integration/test_integration/test_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def test_predict_path_list_input(tmpdir_factory):
("simple",),
("double-fork",),
("double-fork-http",),
("multiprocessing",),
],
)
def test_predict_with_subprocess_in_setup(fixture_name):
Expand Down

0 comments on commit d6a9946

Please sign in to comment.