diff --git a/pipelinewise/cli/commands.py b/pipelinewise/cli/commands.py index ed90bf6bd..72d017e39 100644 --- a/pipelinewise/cli/commands.py +++ b/pipelinewise/cli/commands.py @@ -461,23 +461,28 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No log_file_success = log_file_with_status(log_file, STATUS_SUCCESS) # Start command - with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc: - with open(log_file_running, 'a+', encoding='utf-8') as logfile: - stdout = '' - while True: - line = proc.stdout.readline() - if line: - decoded_line = line.decode('utf-8') - - if line_callback is not None: - decoded_line = line_callback(decoded_line) - - stdout += decoded_line - - logfile.write(decoded_line) - logfile.flush() - if proc.poll() is not None: - break + try: + with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc: + with open(log_file_running, 'a+', encoding='utf-8') as logfile: + stdout = '' + while True: + line = proc.stdout.readline() + if line: + decoded_line = line.decode('utf-8') + + if line_callback is not None: + decoded_line = line_callback(decoded_line) + + stdout += decoded_line + + logfile.write(decoded_line) + logfile.flush() + if proc.poll() is not None: + break + except Exception: + # If the subprocess failed for any reason then make sure to rename the logfile. + os.rename(log_file_running, log_file_failed) + raise proc_rc = proc.poll() if proc_rc != 0: diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 2e015ea6b..b121d5b07 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -1293,49 +1293,53 @@ def stop_tap(self, sig=None, frame=None): a SIGTERM to the process. """ self.logger.info('Trying to stop tap gracefully...') + + # Get PID from pidfile. pidfile_path = self.tap['files']['pidfile'] try: with open(pidfile_path, encoding='utf-8') as pidf: pid = int(pidf.read()) - pgid = os.getpgid(pid) - parent = psutil.Process(pid) - - # Terminate all the processes in the current process' process group. - for child in parent.children(recursive=True): - if os.getpgid(child.pid) == pgid: - self.logger.info('Sending SIGTERM to child pid %s...', child.pid) - child.terminate() - try: - child.wait(timeout=5) - except psutil.TimeoutExpired: - child.kill() + except FileNotFoundError: + self.logger.error( + 'No pidfile found at %s. Tap does not seem to be running.', pidfile_path + ) + sys.exit(1) + # Terminate child processes + try: + pgid = os.getpgid(pid) + parent = psutil.Process(pid) + + # Terminate all the processes in the current process' process group. + for child in parent.children(recursive=True): + if os.getpgid(child.pid) == pgid: + self.logger.info('Sending SIGTERM to child pid %s...', child.pid) + child.terminate() + try: + child.wait(timeout=1) + except psutil.TimeoutExpired: + child.kill() except ProcessLookupError: self.logger.error( 'Pid %s not found. Is the tap running on this machine? ' 'Stopping taps remotely is not supported.', pid, ) - sys.exit(1) - - except FileNotFoundError: - self.logger.error( - 'No pidfile found at %s. Tap does not seem to be running.', pidfile_path - ) - sys.exit(1) - - # Remove pidfile. - os.remove(pidfile_path) - - # Rename log files from running to terminated status - if self.tap_run_log_file: - tap_run_log_file_running = f'{self.tap_run_log_file}.running' - tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated' + finally: + # Remove PID file + os.remove(pidfile_path) - if os.path.isfile(tap_run_log_file_running): - os.rename(tap_run_log_file_running, tap_run_log_file_terminated) + # Rename log files from running to terminated status + if self.tap_run_log_file: + tap_run_log_file_running = f'{self.tap_run_log_file}.running' + tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated' - sys.exit(1) + try: + os.rename(tap_run_log_file_running, tap_run_log_file_terminated) + except FileNotFoundError: + self.logger.warning( + 'No logfile found at %s.', tap_run_log_file_running + ) # pylint: disable=too-many-locals def sync_tables(self): diff --git a/setup.py b/setup.py index 1012c1aa9..13e354cd5 100644 --- a/setup.py +++ b/setup.py @@ -87,3 +87,5 @@ ] }, include_package_data=True) + +# Force container rebuild. diff --git a/tests/units/cli/test_cli.py b/tests/units/cli/test_cli.py index 3ad74e8c3..13d3ca10a 100644 --- a/tests/units/cli/test_cli.py +++ b/tests/units/cli/test_cli.py @@ -624,8 +624,7 @@ def test_command_stop_tap(self): time.sleep(5) # Send the stop_tap command - with pytest.raises(SystemExit): - pipelinewise.stop_tap() + pipelinewise.stop_tap() # Should not have any remaining Pipelinewise related linux process for proc in psutil.process_iter(['cmdline']):