Skip to content

Commit

Permalink
update the log details
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetSaroha committed Jan 17, 2025
1 parent ffe4b5b commit 385282e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 49 deletions.
11 changes: 3 additions & 8 deletions src/makim/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def _call_shell_remote(
ssh.close()
except paramiko.AuthenticationException:
MakimLogs.raise_error(
f"Authentication failed for host {host_config['host']}",
f'Authentication failed for host {host_config["host"]}',
MakimError.SSH_AUTHENTICATION_FAILED,
)
except paramiko.SSHException as ssh_exception:
Expand Down Expand Up @@ -700,11 +700,7 @@ def _run_hooks(self, args: dict[str, Any], hook_type: str) -> None:

# update the arguments
for arg_name, arg_value in hook_data.get('args', {}).items():
unescaped_value = (
str(arg_value)
if isinstance(arg_value, str)
else str(arg_value)
)
unescaped_value = str(arg_value)

args_hook[f'--{arg_name}'] = yaml.safe_load(
TEMPLATE.from_string(unescaped_value).render(
Expand Down Expand Up @@ -867,8 +863,7 @@ def run(self, args: dict[str, Any]) -> None:
self.task_data['if']
):
return warnings.warn(
f'{args["task"]} not executed. '
'Condition (if) not satisfied.'
f'{args["task"]} not executed. Condition (if) not satisfied.'
)

self._run_hooks(args, 'pre-run')
Expand Down
90 changes: 49 additions & 41 deletions src/makim/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def log_execution(
event: str,
result: Optional[str] = None,
error: Optional[str] = None,
task: Optional[str] = None,
) -> None:
"""Log execution details.
Expand All @@ -47,6 +48,7 @@ def log_execution(
event: Event ('scheduled', 'execution_completed', 'execution_failed')
result: Output from task execution
error: Error message if execution failed
task: Associated task name
"""
if Config.job_history_path is None:
raise RuntimeError('Job history path not initialized')
Expand All @@ -61,32 +63,46 @@ def log_execution(

current_time = datetime.now().isoformat()

# Create the scheduler entry if it doesn't exist
# Initialize list for scheduler if it doesn't exist
if name not in history:
history[name] = {
'scheduled_timestamp': current_time,
'event': 'scheduled',
'execution_timestamp': None,
'output': None,
'error': None,
'task': None, # Add task field to track the associated task
}

entry = history[name]
history[name] = []

# Create new execution entry
execution_entry = {
'scheduled_timestamp': current_time,
'event': event,
'execution_timestamp': None,
'output': None,
'error': None,
'task': task,
}

if event == 'scheduled':
# Only update scheduling information
entry.update({'scheduled_timestamp': current_time, 'event': event})
else: # execution_completed or execution_failed
# Preserve the scheduled timestamp and update execution details
entry.update(
# Update execution details based on event type
if event != 'scheduled':
execution_entry.update(
{
'execution_timestamp': current_time,
'event': event,
'output': result,
'error': error,
}
)
# Update the last scheduled timestamp
if history[name]:
last_scheduled = next(
(
entry
for entry in reversed(history[name])
if entry['event'] == 'scheduled'
),
None,
)
if last_scheduled:
execution_entry['scheduled_timestamp'] = last_scheduled[
'scheduled_timestamp'
]

# Append new execution to the history
history[name].append(execution_entry)

# Save updated history
with open(Config.job_history_path, 'w') as f:
Expand All @@ -96,26 +112,19 @@ def log_execution(
print(f'Failed to log execution: {e}')


def run_makim_task(task: str, args: Optional[Dict[str, Any]] = None) -> None:
"""Standalone function to execute a Makim task."""
def run_makim_task(
task: str, scheduler_name: str, args: Optional[Dict[str, Any]] = None
) -> None:
"""Execute a Makim task.
Args:
task: The task to execute
scheduler_name: The name of the scheduler that triggered this task
args: Optional arguments for the task
"""
if Config.config_file is None or Config.job_history_path is None:
raise RuntimeError('Global configuration not initialized')

# Extract scheduler name from the task path
scheduler_name = None
if Config.job_history_path.exists():
with open(Config.job_history_path, 'r') as f:
history = json.load(f)
# Find the scheduler entry that matches this task
for name, entry in history.items():
if entry.get('task') == task:
scheduler_name = name
break

if not scheduler_name:
# Fallback to task name if scheduler name not found
scheduler_name = task.split('.')[-1]

# Build base command with known safe values
cmd = ['makim', '--file', Config.config_file, task]

Expand All @@ -131,14 +140,13 @@ def run_makim_task(task: str, args: Optional[Dict[str, Any]] = None) -> None:
safe_cmd = _sanitize_command(cmd)

try:
# nosec B603 - we've sanitized the input and are not using shell=True
result = subprocess.run(
safe_cmd,
capture_output=True,
text=True,
check=True,
)
# Log successful execution
# Log successful execution using the scheduler name
log_execution(
scheduler_name, 'execution_completed', result=result.stdout
)
Expand All @@ -147,7 +155,7 @@ def run_makim_task(task: str, args: Optional[Dict[str, Any]] = None) -> None:
error_msg = (
f'Job execution failed:\nSTDERR: {e.stderr}\nSTDOUT: {e.stdout}'
)
# Log failed execution
# Log failed execution using the scheduler name
log_execution(scheduler_name, 'execution_failed', error=error_msg)
raise

Expand Down Expand Up @@ -240,14 +248,14 @@ def add_job(
# Create trigger from schedule
trigger = CronTrigger.from_crontab(schedule)

# Log the scheduling event first
log_execution(name, 'scheduled')
# Initialize the job entry with task information
log_execution(name, 'scheduled', task=task) # Added task parameter

# Add the job using the module-level function
self.scheduler.add_job(
func='makim.scheduler:run_makim_task',
trigger=trigger,
args=[task],
args=[task, name], # Pass scheduler_name as an argument
kwargs={'args': args or {}},
id=name,
name=name,
Expand Down

0 comments on commit 385282e

Please sign in to comment.