Skip to content

Commit

Permalink
Merge pull request #141 from lucafaggianelli/dev
Browse files Browse the repository at this point in the history
Fix logs
  • Loading branch information
lucafaggianelli authored Jun 20, 2023
2 parents e7ef348 + 6226a61 commit decc7f8
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 12 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ on:
push:
branches:
- main
- docs
paths:
- docs/**

permissions:
contents: write
Expand All @@ -15,7 +16,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: 3.x
python-version: '3.11'
- uses: actions/cache@v3
with:
key: ${{ github.ref }}
Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
"tag:yaml.org,2002:python/name:materialx.emoji.to_svg",
"tag:yaml.org,2002:python/name:materialx.emoji.twemoji",
"tag:yaml.org,2002:python/name:pymdownx.superfences.fence_code_format"
]
],
"python.analysis.typeCheckingMode": "basic"
}
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Add next fire time to pipelines and triggers (#27)
- show run time in runs list (#129)

### Changed
- Save times with UTC timezone (#132)

### Fixed
- change taskrun duration from positive to non-negative (#128)
- specify button type when in form to avoid submit (#133)
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/components/Tasks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const RunsTasksList: React.FC<Props> = ({ pipeline, run }) => {
<List>
{pipeline.tasks.map((task, i) => (
<ListItem key={task.id} className="space-x-4">
{run.tasks_run[i] ? (
{run.tasks_run && run.tasks_run[i] ? (
<Icon
variant="light"
icon={STATUS_ICONS[run.tasks_run[i].status]}
Expand Down Expand Up @@ -69,7 +69,7 @@ const RunsTasksList: React.FC<Props> = ({ pipeline, run }) => {
)}
</div>

{run.tasks_run[i]?.has_output && (
{run.tasks_run && run.tasks_run[i]?.has_output && (
<Button
variant="light"
color="indigo"
Expand Down
17 changes: 15 additions & 2 deletions src/plombery/database/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
import datetime

from fastapi.encoders import jsonable_encoder
from pydantic import parse_obj_as
Expand Down Expand Up @@ -50,16 +51,28 @@ def process_result_value(self, value, dialect):
return parse_obj_as(self.pydantic_type, value) if value else None


class AwareDateTime(sa.types.TypeDecorator):
"""
Results returned as timezone-aware datetimes (UTC timezone),
not naive ones.
"""

impl = DateTime

def process_result_value(self, value, dialect):
return value.replace(tzinfo=datetime.timezone.utc)


class PipelineRun(Base):
__tablename__ = "pipeline_runs"

id = Column(Integer, primary_key=True, index=True)
pipeline_id = Column(String, index=True)
trigger_id = Column(String)
status = Column(String)
start_time = Column(DateTime)
start_time = Column(AwareDateTime)
duration = Column(Integer, default=0)
tasks_run = Column(PydanticType(List[TaskRun]))
tasks_run = Column(PydanticType(List[TaskRun]), default=list)


Base.metadata.create_all(bind=engine)
Expand Down
4 changes: 4 additions & 0 deletions src/plombery/logger/formatter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import json
from time import gmtime


DEFAULT_LOG_ATTRIBUTES = {
Expand Down Expand Up @@ -34,6 +35,9 @@ def __init__(
self.default_msec_format = msec_format
self.datefmt = None

# Store log timestamp in UTC time
self.converter = gmtime

def usesTime(self) -> bool:
"""
Overwritten to look for the attribute in the format dict values instead of the fmt string.
Expand Down
14 changes: 9 additions & 5 deletions src/plombery/orchestrator/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Callable, Coroutine, List
import asyncio
from datetime import datetime
from datetime import datetime, timezone
import inspect

from pydantic import BaseModel
Expand All @@ -22,6 +22,10 @@
from plombery.schemas import PipelineRunStatus, TaskRun


def utcnow():
return datetime.now(tz=timezone.utc)


def _run_all_tasks(coros: List[Coroutine]):
tasks = set()

Expand All @@ -35,7 +39,7 @@ def _run_all_tasks(coros: List[Coroutine]):
def _on_pipeline_start(pipeline: Pipeline, trigger: Trigger = None):
pipeline_run = create_pipeline_run(
PipelineRunCreate(
start_time=datetime.now(),
start_time=utcnow(),
pipeline_id=pipeline.id,
trigger_id=trigger.id if trigger else MANUAL_TRIGGER_ID,
status="running",
Expand All @@ -48,7 +52,7 @@ def _on_pipeline_start(pipeline: Pipeline, trigger: Trigger = None):


def _on_pipeline_executed(pipeline_run: PipelineRun, status: PipelineRunStatus):
update_pipeline_run(pipeline_run, datetime.now(), status)
update_pipeline_run(pipeline_run, utcnow(), status)

_send_pipeline_event(pipeline_run)

Expand Down Expand Up @@ -106,7 +110,7 @@ async def run(pipeline: Pipeline, trigger: Trigger = None, params: dict = None):
task_run = TaskRun(task_id=task.id)

try:
task_start_time = datetime.now()
task_start_time = utcnow()
flowing_data = await _execute_task(task, flowing_data, params)
task_run.status = PipelineRunStatus.COMPLETED
except Exception as e:
Expand All @@ -115,7 +119,7 @@ async def run(pipeline: Pipeline, trigger: Trigger = None, params: dict = None):
task_run.status = PipelineRunStatus.FAILED
finally:
task_run.duration = (
datetime.now() - task_start_time
utcnow() - task_start_time
).total_seconds() * 1000

task_run.has_output = store_task_output(
Expand Down

0 comments on commit decc7f8

Please sign in to comment.