Skip to content

Commit

Permalink
Add integration tests around stream redirection (#2027)
Browse files Browse the repository at this point in the history
* Add integration tests around stream redirection

to ensure we don't have regressions like that seen in `v0.11.4`.

The tests here are somewhat contrived, but based on how some known
models work that failed when stream redirection didn't work as expected.

* Allow up to 10% of requests to be 409 (busy)

* Look for available port on loopback only

* Add more docstrings and comments about failure mode(s) under test
  • Loading branch information
meatballhat authored Oct 28, 2024
1 parent 8333a83 commit 5c1908f
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
build:
python_version: "3.8"
python_packages:
- requests
predict: "predict.py:Predictor"
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import signal
import time
from random import randint
from wsgiref.simple_server import make_server


def main():
child_pid = os.fork()
is_child = child_pid == 0

pid = os.getpid()

if is_child:
make_server("127.0.0.1", 7777, app).serve_forever()
else:
while True:
print(f"===> PARENT ({pid})")

time.sleep(10)


def app(environ, start_response):
print(f"---> CHILD ({os.getpid()})")

if environ["PATH_INFO"] == "/ping":
start_response("200 OK", [("content-type", "text/plain")])
return [b"PONG\n" for n in range(100 + randint(2, 32))]

start_response("404 Not Found", [("content-type", "text/plain")])
return [b"NO\n"]


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import signal
import subprocess
import sys

import requests

from cog import BasePredictor


class Predictor(BasePredictor):
"""
This predictor checks the case where a process is spawned during setup and then each
prediction depends on being able to communicate with that process. In the event that
stream redirection is not working correctly, the forked process will not be able to
write to stdout/stderr and will likely exit. Any state other than "running" is
considered an error condition and raises SystemExit to interrupt any more prediction
serving.
This variant runs a forked python HTTP server via a shell wrapper to which a request
is made during each call to `predict`.
"""

def setup(self) -> None:
print("---> starting background process")

self.bg = subprocess.Popen(["bash", "run-pong.sh"])

print(f"---> started background process pid={self.bg.pid}")

def predict(self, s: str) -> str:
status = self.bg.poll()

print(f"---> background job status={status}")

if status is None:
print(f"---> sending request to background job pid={self.bg.pid}")

print(requests.get("http://127.0.0.1:7777/ping"))

print(f"---> sent request to background job pid={self.bg.pid}")
else:
raise SystemExit

return "hello " + s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
python ./pong.py &
wait
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build:
python_version: "3.8"
predict: "predict.py:Predictor"
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import signal
import time


def main():
child_pid = os.fork()
is_child = child_pid == 0

pid = os.getpid()
was_pinged = False

while True:
if os.path.exists(".inbox") and is_child:
s = ""

with open(".inbox", "r") as inbox:
print(f"---> CHILD ({pid}) reading request")

s = inbox.read()

os.unlink(".inbox")

with open(".outbox", "w") as outbox:
print(f"---> CHILD ({pid}) sending response")

outbox.write("hello " + s)

if time.time() % 10 == 0:
if is_child:
print(f"---> CHILD ({pid}) " + ("here " * 20))
else:
print(f"===> PARENT ({pid})")

time.sleep(0.01)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os.path
import signal
import subprocess
import sys
import time

from cog import BasePredictor


class Predictor(BasePredictor):
"""
This predictor checks the case where a process is spawned during setup and then each
prediction depends on being able to communicate with that process. In the event that
stream redirection is not working correctly, the forked process will not be able to
write to stdout/stderr and will likely exit. Any state other than "running" is
considered an error condition and raises SystemExit to interrupt any more prediction
serving.
This variant runs a forked python process via a shell wrapper to which a "message" is
sent via file for each call to `predict`.
"""

def setup(self) -> None:
print("---> starting background process")

self.bg = subprocess.Popen(["bash", "run-forker.sh"])

print(f"---> started background process pid={self.bg.pid}")

def predict(self, s: str) -> str:
status = self.bg.poll()

print(f"---> background job status={status}")

if status is not None:
raise SystemExit

print(f"---> sending message to background job pid={self.bg.pid}")

with open(".inbox", "w") as inbox:
inbox.write(s)

print(f"---> sent message to background job pid={self.bg.pid}")

now = time.time()

print(f"---> waiting for outbox message from background job pid={self.bg.pid}")

while not os.path.exists(".outbox"):
if time.time() - now > 5:
raise TimeoutError

time.sleep(0.01)

try:
with open(".outbox", "r") as outbox:
print(f"---> relaying message from background job pid={self.bg.pid}")

return outbox.read()

finally:
os.unlink(".outbox")
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
python ./forker.py &
wait
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bash
set -euo pipefail

# This _pong function and associated trap ensures that any SIGUSR1 sent during `predict`
# will cause this process to write a decent amount of text to stdout. In the event that
# stream redirection is not working correctly, this process will likely be in a defunct
# state before the first SIGUSR1 can be sent.
_pong() {
for i in $(seq 100); do
echo "${0} (${$}) PONG (${i}/100)"
done
}

trap _pong USR1

# This loop simulates a setup period for filling up any stdout buffer.
for i in $(seq 100); do
echo "${0} ($$) SETTING UP (${i}/100)"
sleep 0.01
done

# This loop simulates periodic writes to stdout while the background process is running
# for the purpose of ensuring the file descriptor is still usable.
while true; do
now="$(date +%s)"
now_mod=$((now % 10))

if [[ "${now_mod}" == 0 ]]; then
echo "${0} (${$}) STILL HERE"
sleep 1
fi

sleep 0.1
done
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build:
python_version: "3.8"
predict: "predict.py:Predictor"
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import signal
import subprocess
import sys

from cog import BasePredictor


class Predictor(BasePredictor):
"""
This predictor checks the case where a process is spawned during setup and then each
prediction causes that process to write to stdout. In the event that stream
redirection is not working correctly, the forked process will not be able to write to
stdout/stderr and will likely exit. Any state other than "running" is considered an
error condition and raises SystemExit to interrupt any more prediction serving.
This variant runs a simple subprocess to which SIGUSR1 is sent during each call to
`predict`.
"""

def setup(self) -> None:
print("---> starting background process")

self.bg = subprocess.Popen(["bash", "child.sh"])

print(f"---> started background process pid={self.bg.pid}")

def predict(self, s: str) -> str:
status = self.bg.poll()

if status is None:
print(f"---> sending signal to background job pid={self.bg.pid}")

self.bg.send_signal(signal.SIGUSR1)

print(f"---> sent signal to background job pid={self.bg.pid}")
else:
print(f"---> background job died status={status}")

raise SystemExit

return "hello " + s
35 changes: 35 additions & 0 deletions test-integration/test_integration/test_predict.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import os
import pathlib
import shutil
import subprocess
import sys
from pathlib import Path

import httpx
import pytest

from .util import cog_server_http_run

DEFAULT_TIMEOUT = 60


Expand Down Expand Up @@ -288,3 +293,33 @@ def test_predict_path_list_input(tmpdir_factory):
)
assert "test1" in result.stdout
assert "test2" in result.stdout


@pytest.mark.parametrize(
("fixture_name",),
[
("simple",),
("double-fork",),
("double-fork-http",),
],
)
def test_predict_with_subprocess_in_setup(fixture_name):
project_dir = (
Path(__file__).parent / "fixtures" / f"setup-subprocess-{fixture_name}-project"
)

with cog_server_http_run(project_dir) as addr:
busy_count = 0

for i in range(100):
response = httpx.post(
f"{addr}/predictions",
json={"input": {"s": f"friendo{i}"}},
)
if response.status_code == 409:
busy_count += 1
continue

assert response.status_code == 200, str(response)

assert busy_count < 10
Loading

0 comments on commit 5c1908f

Please sign in to comment.