Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create proper enums for job statuses #412

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mkdir samples
vim .env
docker compose up --scale daemon=3 # this will take a while
docker compose exec web python3 -m mquery.db
docker compose exec web python3 -m alembic upgrade head
Comment on lines 23 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. We want to reduce the number of additional commands to zero (#402), not add more of them.

Copy link
Member

@msm-cert msm-cert Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't want to fix this in this PR and want to work separately in #402, it's OK to leave it as it is now (i.e. don't change the documentation in this PR. It works for fresh installations, just doesn't run migrations automatically).

If you want, I'll add some hints to #402 how to approach it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. We want to reduce the number of additional commands to zero (#402), not add more of them.

All because of alembic, which does not automatically recognize the modification from varchar to enum. I am able to create an enum type in postgres from the model level, but alembic does not replace it (it still remains VARCHAR) - that's why my manual modifications in migration files.

I found a library https://pypi.org/project/alembic-postgresql-enum/, quite new, I have to test it, but the question is whether to enter something like that at all. Or maybe look for solutions in sqlalchemy dialects for postgresql

```

- Good for testing mquery and production deployments on a single server
Expand All @@ -39,6 +40,7 @@ cd mquery
vim .env
docker compose -f docker-compose.dev.yml up # this will take a while
docker compose exec dev-web python3 -m mquery.db
docker compose exec dev-web python3 -m alembic upgrade head
```

- Good for development - all file changes will be picked up automatically.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cd mquery
vim .env # optional - change samples and index directory locations
docker compose up --scale daemon=3 # building the images will take a while
docker compose exec web python3 -m mquery.db
docker compose exec web python3 -m alembic upgrade head
```

The web interface should be available at `http://localhost`.
Expand Down
20 changes: 13 additions & 7 deletions src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from .models.agentgroup import AgentGroup
from .models.configentry import ConfigEntry
from .models.job import Job
from .models.job import Job, JobStatus
from .models.jobagent import JobAgent
from .models.match import Match
from .schema import MatchesSchema, ConfigSchema
Expand Down Expand Up @@ -65,7 +65,11 @@ def cancel_job(self, job: JobId, error=None) -> None:
session.execute(
update(Job)
.where(Job.id == job)
.values(status="cancelled", finished=int(time()), error=error)
.values(
status=JobStatus.cancelled,
finished=int(time()),
error=error,
)
)
session.commit()

Expand All @@ -87,7 +91,7 @@ def get_valid_jobs(self, username_filter: Optional[str]) -> List[Job]:
with self.session() as session:
query = (
select(Job)
.where(Job.status != "removed")
.where(Job.status != JobStatus.removed)
.order_by(col(Job.submitted).desc())
)
if username_filter:
Expand All @@ -98,7 +102,9 @@ def remove_query(self, job: JobId) -> None:
"""Sets the job status to removed."""
with self.session() as session:
session.execute(
update(Job).where(Job.id == job).values(status="removed")
update(Job)
.where(Job.id == job)
.values(status=JobStatus.removed)
)
session.commit()

Expand Down Expand Up @@ -147,7 +153,7 @@ def agent_finish_job(self, job: Job) -> None:
session.execute(
update(Job)
.where(Job.internal_id == job.internal_id)
.values(finished=int(time()), status="done")
.values(finished=int(time()), status=JobStatus.done)
)
session.commit()

Expand Down Expand Up @@ -218,7 +224,7 @@ def init_job_datasets(self, job: JobId, num_datasets: int) -> None:
.values(
total_datasets=num_datasets,
datasets_left=num_datasets,
status="processing",
status=JobStatus.processing,
)
)
session.commit()
Expand Down Expand Up @@ -251,7 +257,7 @@ def create_search_task(
with self.session() as session:
obj = Job(
id=job,
status="new",
status=JobStatus.new,
rule_name=rule_name,
rule_author=rule_author,
raw_yara=raw_yara,
Expand Down
28 changes: 28 additions & 0 deletions src/migrations/versions/84796bdc2f77_add_jobstatus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""add_jobstatus
Revision ID: 84796bdc2f77
Revises: dbb81bd4d47f
Create Date: 2024-10-01 08:09:42.808911
"""
from alembic import op


# revision identifiers, used by Alembic.
revision = "84796bdc2f77"
down_revision = "dbb81bd4d47f"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"CREATE TYPE jobstatus AS ENUM ('done', 'new', 'cancelled', 'removed', 'processing');"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to know every possible type for this migration. Just change every value out of scope to cancelledand then convert to enum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(But good point that we need to handle this correctly)

)
op.execute(
"ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::text::jobstatus;"
)


def downgrade() -> None:
op.execute(
"ALTER TABLE job ALTER COLUMN status TYPE VARCHAR USING status::text;"
)
162 changes: 94 additions & 68 deletions src/migrations/versions/cbbba858deb0_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,110 @@
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect

import sqlmodel

from config import app_config

revision = "cbbba858deb0"
down_revision = None
branch_labels = None
depends_on = None

DATABASE_URL = app_config.database.url
engine = sa.create_engine(DATABASE_URL)


def upgrade() -> None:
op.create_table(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? We should not change older migrations in any case. (Please also don't reformat them, to keep unnecessary git changes to minimum)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I understand. I did this because the alembic upgrade head command in an existing environment causes errors that the table exists. So I check beforehand if the table exists.

"agentgroup",
sa.Column("plugins_spec", sa.JSON(), nullable=True),
sa.Column("active_plugins", sa.ARRAY(sa.String()), nullable=True),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"ursadb_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("id", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"configentry",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"plugin", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("key", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("value", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"job",
sa.Column("taints", sa.ARRAY(sa.String()), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"status", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("error", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column(
"rule_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"rule_author", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"raw_yara", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("submitted", sa.Integer(), nullable=False),
sa.Column("finished", sa.Integer(), nullable=True),
sa.Column("files_limit", sa.Integer(), nullable=False),
sa.Column(
"reference", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("files_processed", sa.Integer(), nullable=False),
sa.Column("files_matched", sa.Integer(), nullable=False),
sa.Column("files_in_progress", sa.Integer(), nullable=False),
sa.Column("total_files", sa.Integer(), nullable=False),
sa.Column("files_errored", sa.Integer(), nullable=False),
sa.Column("datasets_left", sa.Integer(), nullable=False),
sa.Column("total_datasets", sa.Integer(), nullable=False),
sa.Column("agents_left", sa.Integer(), nullable=False),
sa.Column("internal_id", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("internal_id"),
)
op.create_table(
"match",
sa.Column("meta", sa.JSON(), nullable=True),
sa.Column("matches", sa.ARRAY(sa.String()), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("job_id", sa.Integer(), nullable=False),
sa.Column("file", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.ForeignKeyConstraint(
["job_id"],
["job.internal_id"],
),
sa.PrimaryKeyConstraint("id"),
)
if not inspect(engine).has_table("agentgroup"):
op.create_table(
"agentgroup",
sa.Column("plugins_spec", sa.JSON(), nullable=True),
sa.Column("active_plugins", sa.ARRAY(sa.String()), nullable=True),
sa.Column(
"name", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"ursadb_url",
sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
),
sa.Column("id", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
if not inspect(engine).has_table("configentry"):
op.create_table(
"configentry",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"plugin", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"key", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"value", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.PrimaryKeyConstraint("id"),
)
if not inspect(engine).has_table("job"):
op.create_table(
"job",
sa.Column("taints", sa.ARRAY(sa.String()), nullable=True),
sa.Column(
"id", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"status", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"error", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"rule_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"rule_author",
sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
),
sa.Column(
"raw_yara", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("submitted", sa.Integer(), nullable=False),
sa.Column("finished", sa.Integer(), nullable=True),
sa.Column("files_limit", sa.Integer(), nullable=False),
sa.Column(
"reference", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("files_processed", sa.Integer(), nullable=False),
sa.Column("files_matched", sa.Integer(), nullable=False),
sa.Column("files_in_progress", sa.Integer(), nullable=False),
sa.Column("total_files", sa.Integer(), nullable=False),
sa.Column("files_errored", sa.Integer(), nullable=False),
sa.Column("datasets_left", sa.Integer(), nullable=False),
sa.Column("total_datasets", sa.Integer(), nullable=False),
sa.Column("agents_left", sa.Integer(), nullable=False),
sa.Column("internal_id", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("internal_id"),
)
if not inspect(engine).has_table("match"):
op.create_table(
"match",
sa.Column("meta", sa.JSON(), nullable=True),
sa.Column("matches", sa.ARRAY(sa.String()), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("job_id", sa.Integer(), nullable=False),
sa.Column(
"file", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.ForeignKeyConstraint(
["job_id"],
["job.internal_id"],
),
sa.PrimaryKeyConstraint("id"),
)


def downgrade() -> None:
Expand Down
41 changes: 24 additions & 17 deletions src/migrations/versions/dbb81bd4d47f_add_jobagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,41 @@
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect


from config import app_config

# revision identifiers, used by Alembic.
revision = "dbb81bd4d47f"
down_revision = "cbbba858deb0"
branch_labels = None
depends_on = None

DATABASE_URL = app_config.database.url
engine = sa.create_engine(DATABASE_URL)


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"jobagent",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, please don't change and reformat this file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

sa.Column("id", sa.Integer(), nullable=False),
sa.Column("task_in_progress", sa.Integer(), nullable=False),
sa.Column("job_id", sa.Integer(), nullable=False),
sa.Column("agent_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["agent_id"],
["agentgroup.id"],
),
sa.ForeignKeyConstraint(
["job_id"],
["job.internal_id"],
),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###
if not inspect(engine).has_table("jobagent"):
op.create_table(
"jobagent",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("task_in_progress", sa.Integer(), nullable=False),
sa.Column("job_id", sa.Integer(), nullable=False),
sa.Column("agent_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["agent_id"],
["agentgroup.id"],
),
sa.ForeignKeyConstraint(
["job_id"],
["job.internal_id"],
),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###


def downgrade() -> None:
Expand Down
16 changes: 15 additions & 1 deletion src/models/job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import enum

from sqlalchemy import Enum as PgEnum
from sqlmodel import SQLModel, Field, ARRAY, String, Column, Relationship
from typing import Optional, List, Union, TYPE_CHECKING

Expand All @@ -6,11 +9,19 @@
from ..models.jobagent import JobAgent


class JobStatus(str, enum.Enum):
done = "done"
new = "new"
cancelled = "cancelled"
removed = "removed"
processing = "processing"


class JobBase(SQLModel):
"""Base class for entities related to mquery jobs."""

id: str
status: str
status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) # type: ignore
error: Optional[str]
rule_name: str
rule_author: str
Expand All @@ -29,6 +40,9 @@ class JobBase(SQLModel):
total_datasets: int
agents_left: int

class Config:
arbitrary_types_allowed = True


class Job(JobBase, table=True):
"""Job object in the database. Internal ID is an implementation detail."""
Expand Down
Loading
Loading