Skip to content

Commit

Permalink
Merge pull request #28 from thread/rename-logfile
Browse files Browse the repository at this point in the history
Reduce likelihood of deadlocked processes
  • Loading branch information
judahrand authored May 27, 2022
2 parents d5579ac + 563b89f commit 835fe6f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 49 deletions.
39 changes: 22 additions & 17 deletions pipelinewise/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,23 +461,28 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No
log_file_success = log_file_with_status(log_file, STATUS_SUCCESS)

# Start command
with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc:
with open(log_file_running, 'a+', encoding='utf-8') as logfile:
stdout = ''
while True:
line = proc.stdout.readline()
if line:
decoded_line = line.decode('utf-8')

if line_callback is not None:
decoded_line = line_callback(decoded_line)

stdout += decoded_line

logfile.write(decoded_line)
logfile.flush()
if proc.poll() is not None:
break
try:
with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc:
with open(log_file_running, 'a+', encoding='utf-8') as logfile:
stdout = ''
while True:
line = proc.stdout.readline()
if line:
decoded_line = line.decode('utf-8')

if line_callback is not None:
decoded_line = line_callback(decoded_line)

stdout += decoded_line

logfile.write(decoded_line)
logfile.flush()
if proc.poll() is not None:
break
except Exception:
# If the subprocess failed for any reason then make sure to rename the logfile.
os.rename(log_file_running, log_file_failed)
raise

proc_rc = proc.poll()
if proc_rc != 0:
Expand Down
64 changes: 34 additions & 30 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,49 +1293,53 @@ def stop_tap(self, sig=None, frame=None):
a SIGTERM to the process.
"""
self.logger.info('Trying to stop tap gracefully...')

# Get PID from pidfile.
pidfile_path = self.tap['files']['pidfile']
try:
with open(pidfile_path, encoding='utf-8') as pidf:
pid = int(pidf.read())
pgid = os.getpgid(pid)
parent = psutil.Process(pid)

# Terminate all the processes in the current process' process group.
for child in parent.children(recursive=True):
if os.getpgid(child.pid) == pgid:
self.logger.info('Sending SIGTERM to child pid %s...', child.pid)
child.terminate()
try:
child.wait(timeout=5)
except psutil.TimeoutExpired:
child.kill()
except FileNotFoundError:
self.logger.error(
'No pidfile found at %s. Tap does not seem to be running.', pidfile_path
)
sys.exit(1)

# Terminate child processes
try:
pgid = os.getpgid(pid)
parent = psutil.Process(pid)

# Terminate all the processes in the current process' process group.
for child in parent.children(recursive=True):
if os.getpgid(child.pid) == pgid:
self.logger.info('Sending SIGTERM to child pid %s...', child.pid)
child.terminate()
try:
child.wait(timeout=1)
except psutil.TimeoutExpired:
child.kill()
except ProcessLookupError:
self.logger.error(
'Pid %s not found. Is the tap running on this machine? '
'Stopping taps remotely is not supported.',
pid,
)
sys.exit(1)

except FileNotFoundError:
self.logger.error(
'No pidfile found at %s. Tap does not seem to be running.', pidfile_path
)
sys.exit(1)

# Remove pidfile.
os.remove(pidfile_path)

# Rename log files from running to terminated status
if self.tap_run_log_file:
tap_run_log_file_running = f'{self.tap_run_log_file}.running'
tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated'
finally:
# Remove PID file
os.remove(pidfile_path)

if os.path.isfile(tap_run_log_file_running):
os.rename(tap_run_log_file_running, tap_run_log_file_terminated)
# Rename log files from running to terminated status
if self.tap_run_log_file:
tap_run_log_file_running = f'{self.tap_run_log_file}.running'
tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated'

sys.exit(1)
try:
os.rename(tap_run_log_file_running, tap_run_log_file_terminated)
except FileNotFoundError:
self.logger.warning(
'No logfile found at %s.', tap_run_log_file_running
)

# pylint: disable=too-many-locals
def sync_tables(self):
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,5 @@
]
},
include_package_data=True)

# Force container rebuild.
3 changes: 1 addition & 2 deletions tests/units/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,7 @@ def test_command_stop_tap(self):
time.sleep(5)

# Send the stop_tap command
with pytest.raises(SystemExit):
pipelinewise.stop_tap()
pipelinewise.stop_tap()

# Should not have any remaining Pipelinewise related linux process
for proc in psutil.process_iter(['cmdline']):
Expand Down

0 comments on commit 835fe6f

Please sign in to comment.