Skip to content

Commit

Permalink
Merge pull request #162 from Pythagora-io/feature/process-awareness-a…
Browse files Browse the repository at this point in the history
…nd-termination

Process awareness and termination
  • Loading branch information
LeonOstrez authored Oct 11, 2023
2 parents a3cd6ea + e4db222 commit 14baf24
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 47 deletions.
26 changes: 23 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,34 @@ on:
pull_request:
branches:
- main
- debugging_ipc

jobs:
build:
Docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build the Docker image
run: docker compose build
- name: Run the Docker image
run: docker compose up gpt-pilot -d
- name: Wait for the Docker image to start
run: docker ps
- name: Stop the Docker image
run: docker compose down

Test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
# 3.10 - 04 Oct 2021
# 3.11 - 24 Oct 2022
python-version: ['3.9', '3.10', '3.11', '3.12']
os: [ubuntu-latest, macos-latest, windows-latest]
exclude:
# LINK : fatal error LNK1181: cannot open input file 'libpq.lib'
# Maybe related: https://github.com/psycopg/psycopg2/issues/1628
- os: windows-latest
python-version: '3.12'

steps:
- uses: actions/checkout@v4
Expand All @@ -42,7 +60,9 @@ jobs:
#ruff --format=github --target-version=py37 --ignore=F401,E501 .
- name: Run tests
env:
PYTHONPATH: .
run: |
pip install pytest
cd pilot
PYTHONPATH=. pytest -m "not slow and not uses_tokens and not ux_test"
pytest -m "not slow and not uses_tokens and not ux_test"
18 changes: 13 additions & 5 deletions pilot/const/function_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def command_definition(description_command=f'A single command that needs to be e
description_timeout=
'Timeout in milliseconds that represent the approximate time this command takes to finish. '
'If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), '
'set the timeout to -1 and provide a process_name. '
'set the timeout to to a value long enough to determine that it has started successfully and provide a process_name. '
'If you need to create a directory that doesn\'t exist and is not the root project directory, '
'always create it by running a command `mkdir`'):
return {
Expand All @@ -59,6 +59,10 @@ def command_definition(description_command=f'A single command that needs to be e
'type': 'number',
'description': description_timeout,
},
'success_message': {
'type': 'string',
'description': 'A message to look for in the output of the command to determine if successful or not.',
},
'process_name': {
'type': 'string',
'description': 'If the process needs to continue running after the command is executed provide '
Expand Down Expand Up @@ -136,7 +140,7 @@ def command_definition(description_command=f'A single command that needs to be e
'description': 'List of smaller development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesn\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
Expand Down Expand Up @@ -179,14 +183,18 @@ def command_definition(description_command=f'A single command that needs to be e
'description': 'List of smaller development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesn\'t finish by itself (eg. a command to run an If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
'enum': ['command', 'code_change', 'human_intervention'],
'enum': ['command', 'kill_process', 'code_change', 'human_intervention'],
'description': 'Type of the development step that needs to be done to complete the entire task.',
},
'command': command_definition(),
'kill_process': {
'type': 'string',
'description': 'To kill a process that was left running by a previous `command` step provide the `process_name` in this field and set `type` to "kill_process".',
},
'code_change': {
'type': 'object',
'description': 'A code change that needs to be implemented. This should be used only if the task is of a type "code_change".',
Expand Down Expand Up @@ -456,7 +464,7 @@ def command_definition(description_command=f'A single command that needs to be e
},
'path': {
'type': 'string',
'description': 'Path of the file that needs to be saved on the disk.',
'description': 'Full path of the file with the file name that needs to be saved.',
},
'content': {
'type': 'string',
Expand Down
2 changes: 2 additions & 0 deletions pilot/helpers/AgentConvo.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def send_message(self, prompt_path=None, prompt_data=None, function_calls: Funct
development_step = save_development_step(self.agent.project, prompt_path, prompt_data, self.messages, response)

# TODO handle errors from OpenAI
# It's complicated because calling functions are expecting different types of responses - string or tuple
# https://github.com/Pythagora-io/gpt-pilot/issues/165 & #91
if response == {}:
logger.error(f'Aborting with "OpenAI API error happened"')
raise Exception("OpenAI API error happened.")
Expand Down
13 changes: 10 additions & 3 deletions pilot/helpers/agents/Developer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import platform
import uuid
from utils.style import green, red, green_bold, yellow_bold, red_bold, blue_bold, white_bold
from helpers.exceptions.TokenLimitError import TokenLimitError
Expand All @@ -11,7 +12,7 @@
from helpers.Agent import Agent
from helpers.AgentConvo import AgentConvo
from utils.utils import should_execute_step, array_of_objects_to_string, generate_app_data
from helpers.cli import run_command_until_success, execute_command_and_check_cli_response
from helpers.cli import run_command_until_success, execute_command_and_check_cli_response, running_processes
from const.function_calls import FILTER_OS_TECHNOLOGIES, EXECUTE_COMMANDS, GET_TEST_TYPE, IMPLEMENT_TASK
from database.database import save_progress, get_progress_steps, update_app_status
from utils.utils import get_os_info
Expand Down Expand Up @@ -97,10 +98,12 @@ def step_command_run(self, convo, step, i):
additional_message = 'Let\'s start with the step #0:\n\n' if i == 0 else f'So far, steps { ", ".join(f"#{j}" for j in range(i)) } are finished so let\'s do step #{i + 1} now.\n\n'

process_name = data['process_name'] if 'process_name' in data else None
success_message = data['success_message'] if 'success_message' in data else None

return run_command_until_success(convo, data['command'],
timeout=data['timeout'],
process_name=process_name,
success_message=success_message,
additional_message=additional_message)

def step_human_intervention(self, convo, step: dict):
Expand Down Expand Up @@ -171,7 +174,9 @@ def task_postprocessing(self, convo, development_task, continue_development, tas

if development_task is not None:
convo.remove_last_x_messages(2)
detailed_user_review_goal = convo.send_message('development/define_user_review_goal.prompt', {})
detailed_user_review_goal = convo.send_message('development/define_user_review_goal.prompt', {
'os': platform.system()
})
convo.remove_last_x_messages(2)

try:
Expand Down Expand Up @@ -340,7 +345,9 @@ def continue_development(self, iteration_convo, last_branch_name, continue_descr

# self.debugger.debug(iteration_convo, user_input=user_feedback)

task_steps = iteration_convo.send_message('development/parse_task.prompt', {}, IMPLEMENT_TASK)
task_steps = iteration_convo.send_message('development/parse_task.prompt', {
'running_processes': running_processes
}, IMPLEMENT_TASK)
iteration_convo.remove_last_x_messages(2)

return self.execute_task(iteration_convo, task_steps, is_root_task=True)
Expand Down
50 changes: 30 additions & 20 deletions pilot/helpers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

interrupted = False

running_processes: Dict[str, int] = {}
"""Holds a list of process IDs, mapped to the `process_name` provided in the call to `execute_command()`."""
running_processes: Dict[str, tuple[str, int]] = {}
"""Holds a list of (command, process ID)s, mapped to the `process_name` provided in the call to `execute_command()`."""


def enqueue_output(out, q):
Expand Down Expand Up @@ -74,12 +74,12 @@ def run_command(command, root_path, q_stdout, q_stderr) -> subprocess.Popen:

def terminate_named_process(process_name: str) -> None:
if process_name in running_processes:
terminate_process(running_processes[process_name], process_name)
terminate_process(running_processes[process_name][1], process_name)


def terminate_running_processes():
for process_name in list(running_processes.keys()):
terminate_process(running_processes[process_name], process_name)
terminate_process(running_processes[process_name][1], process_name)


def terminate_process(pid: int, name=None) -> None:
Expand All @@ -100,27 +100,29 @@ def terminate_process(pid: int, name=None) -> None:
logger.error(f'Error while terminating process: {e}')

for process_name in list(running_processes.keys()):
if running_processes[process_name] == pid:
if running_processes[process_name][1] == pid:
del running_processes[process_name]


def execute_command(project, command, timeout=None, process_name: str = None, force=False):
def execute_command(project, command, timeout=None, success_message=None, process_name: str = None, force=False) \
-> (str, str, int):
"""
Execute a command and capture its output.
Args:
project: The project associated with the command.
command (str): The command to run.
timeout (int, optional): The maximum execution time in milliseconds. Default is None.
success_message: A message to look for in the output of the command to determine if successful or not.
process_name (str, optional): A name for the process.
If `timeout` is not provided, can be used to terminate the process.
force (bool, optional): Whether to execute the command without confirmation. Default is False.
Returns:
cli_response (str): The command output
or: '', 'DONE' if user answered 'no' or 'skip'
llm_response (str): The response from the agent.
TODO: this seems to be 'DONE' (no or skip) or None
llm_response (str): 'DONE' if 'no', 'skip' or `success_message` matched.
Otherwise `None` - caller should send `cli_response` to LLM
exit_code (int): The exit code of the process.
"""
if timeout is not None:
Expand Down Expand Up @@ -166,6 +168,7 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo
return command_run.cli_response, None, None

return_value = None
was_success = None

q_stderr = queue.Queue()
q = queue.Queue()
Expand All @@ -174,7 +177,7 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo

if process_name is not None:
terminate_named_process(process_name)
running_processes[process_name] = process.pid
running_processes[process_name] = (command, process.pid)

output = ''
stderr_output = ''
Expand All @@ -189,9 +192,9 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo
try:
while True:
elapsed_time = time.time() - start_time
if timeout is not None:
# TODO: print to IPC using a different message type so VS Code can ignore it or update the previous value
print(white_bold(f'\rt: {round(elapsed_time * 1000)}ms : '), end='', flush=True)
# if timeout is not None:
# # TODO: print to IPC using a different message type so VS Code can ignore it or update the previous value
# print(white_bold(f'\rt: {round(elapsed_time * 1000)}ms : '), end='', flush=True)

# Check if process has finished
if process.poll() is not None:
Expand All @@ -207,6 +210,10 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo

# If timeout is reached, kill the process
if timeout is not None and elapsed_time * 1000 > timeout:
if process_name is not None:
logger.info(f'Process "{process_name}" running after timeout as pid: {process.pid}')
break

raise TimeoutError("Command exceeded the specified timeout.")
# os.killpg(process.pid, signal.SIGKILL)
# break
Expand All @@ -220,6 +227,10 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo
output += line
print(green('CLI OUTPUT:') + line, end='')
logger.info('CLI OUTPUT: ' + line)
if success_message is not None and success_message in line:
logger.info('Success message found: %s', success_message)
was_success = True
break

# Read stderr
try:
Expand All @@ -231,10 +242,6 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo
stderr_output += stderr_line
print(red('CLI ERROR:') + stderr_line, end='') # Print with different color for distinction
logger.error('CLI ERROR: ' + stderr_line)

if process_name is not None:
logger.info(f'Process {process_name} running as pid: {process.pid}')
break

except (KeyboardInterrupt, TimeoutError) as e:
interrupted = True
Expand All @@ -245,11 +252,11 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo
print('\nTimeout detected. Stopping command execution...')
logger.warn('Timeout detected. Stopping command execution...')

was_success = False
terminate_process(process.pid)

elapsed_time = time.time() - start_time
print(f'{command} took {round(elapsed_time * 1000)}ms to execute.')
logger.info(f'{command} took {round(elapsed_time * 1000)}ms to execute.')
logger.info(f'`{command}` took {round(elapsed_time * 1000)}ms to execute.')

# stderr_output = ''
# while not q_stderr.empty():
Expand All @@ -263,7 +270,7 @@ def execute_command(project, command, timeout=None, process_name: str = None, fo

save_command_run(project, command, return_value)

return return_value, None, process.returncode
return return_value, 'DONE' if was_success else None, process.returncode


def build_directory_tree(path, prefix="", ignore=None, is_last=False, files=None, add_descriptions=False):
Expand Down Expand Up @@ -332,6 +339,7 @@ def execute_command_and_check_cli_response(command, timeout, convo):
def run_command_until_success(convo, command,
timeout: Union[int, None],
process_name: Union[str, None] = None,
success_message=None,
additional_message=None,
force=False,
return_cli_response=False,
Expand All @@ -345,6 +353,7 @@ def run_command_until_success(convo, command,
timeout (int): The maximum execution time in milliseconds.
process_name: A name for the process.
If `timeout` is not provided, can be used to terminate the process.
success_message: A message to look for in the output of the command to determine if successful or not.
additional_message (str, optional): Additional message to include in the response.
force (bool, optional): Whether to execute the command without confirmation. Default is False.
return_cli_response (bool, optional): If True, may raise TooDeepRecursionError(cli_response)
Expand All @@ -353,11 +362,12 @@ def run_command_until_success(convo, command,
cli_response, response, exit_code = execute_command(convo.agent.project,
command,
timeout=timeout,
success_message=success_message,
process_name=process_name,
force=force)

if response is None:
logger.info(f'{command} exit code: {exit_code}')
logger.info(f'`{command}` exit code: {exit_code}')
if exit_code is None:
response = 'DONE'
else:
Expand Down
Loading

0 comments on commit 14baf24

Please sign in to comment.