From 41313cda40718968cf1abf4f7ff1c170751b6e4c Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 1 Oct 2024 14:03:12 +0200 Subject: [PATCH 1/6] Job status added --- src/db.py | 14 ++++++------- .../versions/84796bdc2f77_add_jobstatus.py | 21 +++++++++++++++++++ src/models/job.py | 16 +++++++++++++- src/tasks.py | 8 +++---- 4 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 src/migrations/versions/84796bdc2f77_add_jobstatus.py diff --git a/src/db.py b/src/db.py index 47036d0a..02d40bcf 100644 --- a/src/db.py +++ b/src/db.py @@ -19,7 +19,7 @@ from .models.agentgroup import AgentGroup from .models.configentry import ConfigEntry -from .models.job import Job +from .models.job import Job, JobStatus from .models.jobagent import JobAgent from .models.match import Match from .schema import MatchesSchema, ConfigSchema @@ -65,7 +65,7 @@ def cancel_job(self, job: JobId, error=None) -> None: session.execute( update(Job) .where(Job.id == job) - .values(status="cancelled", finished=int(time()), error=error) + .values(status=JobStatus.cancelled, finished=int(time()), error=error) ) session.commit() @@ -87,7 +87,7 @@ def get_valid_jobs(self, username_filter: Optional[str]) -> List[Job]: with self.session() as session: query = ( select(Job) - .where(Job.status != "removed") + .where(Job.status != JobStatus.removed) .order_by(col(Job.submitted).desc()) ) if username_filter: @@ -98,7 +98,7 @@ def remove_query(self, job: JobId) -> None: """Sets the job status to removed.""" with self.session() as session: session.execute( - update(Job).where(Job.id == job).values(status="removed") + update(Job).where(Job.id == job).values(status=JobStatus.removed) ) session.commit() @@ -147,7 +147,7 @@ def agent_finish_job(self, job: Job) -> None: session.execute( update(Job) .where(Job.internal_id == job.internal_id) - .values(finished=int(time()), status="done") + .values(finished=int(time()), status=JobStatus.done) ) session.commit() @@ -218,7 +218,7 @@ def init_job_datasets(self, job: JobId, num_datasets: int) -> None: .values( total_datasets=num_datasets, datasets_left=num_datasets, - status="processing", + status=JobStatus.processing, ) ) session.commit() @@ -251,7 +251,7 @@ def create_search_task( with self.session() as session: obj = Job( id=job, - status="new", + status=JobStatus.new, rule_name=rule_name, rule_author=rule_author, raw_yara=raw_yara, diff --git a/src/migrations/versions/84796bdc2f77_add_jobstatus.py b/src/migrations/versions/84796bdc2f77_add_jobstatus.py new file mode 100644 index 00000000..aa9e86c0 --- /dev/null +++ b/src/migrations/versions/84796bdc2f77_add_jobstatus.py @@ -0,0 +1,21 @@ +"""add_jobstatus +Revision ID: 84796bdc2f77 +Revises: dbb81bd4d47f +Create Date: 2024-10-01 08:09:42.808911 +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = '84796bdc2f77' +down_revision = 'dbb81bd4d47f' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute('ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::jobstatus;') + + +def downgrade() -> None: + op.execute('ALTER TABLE job ALTER COLUMN status TYPE VARCHAR USING status::text;') diff --git a/src/models/job.py b/src/models/job.py index 17db42ad..5c7a4c0c 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -1,3 +1,6 @@ +import enum + +from sqlalchemy import Enum as PgEnum from sqlmodel import SQLModel, Field, ARRAY, String, Column, Relationship from typing import Optional, List, Union, TYPE_CHECKING @@ -6,11 +9,19 @@ from ..models.jobagent import JobAgent +class JobStatus(str, enum.Enum): + done = "done" + new = "new" + cancelled = "cancelled" + removed = "removed" + processing = "processing" + + class JobBase(SQLModel): """Base class for entities related to mquery jobs.""" id: str - status: str + status: JobStatus = Field(sa_column=PgEnum(JobStatus, name="jobstatus", create_type=True)) error: Optional[str] rule_name: str rule_author: str @@ -29,6 +40,9 @@ class JobBase(SQLModel): total_datasets: int agents_left: int + class Config: + arbitrary_types_allowed = True + class Job(JobBase, table=True): """Job object in the database. Internal ID is an implementation detail.""" diff --git a/src/tasks.py b/src/tasks.py index edb9a426..d7443f41 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -9,7 +9,7 @@ from .util import make_sha256_tag from .config import app_config from .plugins import PluginManager -from .models.job import Job +from .models.job import Job, JobStatus from .models.match import Match from .lib.yaraparse import parse_yara, combine_rules from .lib.ursadb import Json, UrsaDb @@ -182,7 +182,7 @@ def start_search(job_id: JobId) -> None: """ with job_context(job_id) as agent: job = agent.db.get_job(job_id) - if job.status == "cancelled": + if job.status == JobStatus.cancelled: logging.info("Job was cancelled, returning...") return @@ -232,7 +232,7 @@ def query_ursadb(job_id: JobId, dataset_id: str, ursadb_query: str) -> None: """Queries ursadb and creates yara scans tasks with file batches.""" with job_context(job_id) as agent: job = agent.db.get_job(job_id) - if job.status == "cancelled": + if job.status == JobStatus.cancelled: logging.info("Job was cancelled, returning...") return @@ -271,7 +271,7 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None: """Actually scans files, and updates a database with the results.""" with job_context(job_id) as agent: job = agent.db.get_job(job_id) - if job.status == "cancelled": + if job.status == JobStatus.cancelled: logging.info("Job was cancelled, returning...") return From f4bae197cd592d74a8240b1012775793c287dd6b Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 1 Oct 2024 18:14:05 +0200 Subject: [PATCH 2/6] Job status with alembic migrations --- src/db.py | 10 +- .../versions/84796bdc2f77_add_jobstatus.py | 15 +- src/migrations/versions/cbbba858deb0_init.py | 162 ++++++++++-------- .../versions/dbb81bd4d47f_add_jobagent.py | 41 +++-- src/models/job.py | 2 +- 5 files changed, 138 insertions(+), 92 deletions(-) diff --git a/src/db.py b/src/db.py index 02d40bcf..0952e407 100644 --- a/src/db.py +++ b/src/db.py @@ -65,7 +65,11 @@ def cancel_job(self, job: JobId, error=None) -> None: session.execute( update(Job) .where(Job.id == job) - .values(status=JobStatus.cancelled, finished=int(time()), error=error) + .values( + status=JobStatus.cancelled, + finished=int(time()), + error=error, + ) ) session.commit() @@ -98,7 +102,9 @@ def remove_query(self, job: JobId) -> None: """Sets the job status to removed.""" with self.session() as session: session.execute( - update(Job).where(Job.id == job).values(status=JobStatus.removed) + update(Job) + .where(Job.id == job) + .values(status=JobStatus.removed) ) session.commit() diff --git a/src/migrations/versions/84796bdc2f77_add_jobstatus.py b/src/migrations/versions/84796bdc2f77_add_jobstatus.py index aa9e86c0..1f60a001 100644 --- a/src/migrations/versions/84796bdc2f77_add_jobstatus.py +++ b/src/migrations/versions/84796bdc2f77_add_jobstatus.py @@ -7,15 +7,22 @@ # revision identifiers, used by Alembic. -revision = '84796bdc2f77' -down_revision = 'dbb81bd4d47f' +revision = "84796bdc2f77" +down_revision = "dbb81bd4d47f" branch_labels = None depends_on = None def upgrade() -> None: - op.execute('ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::jobstatus;') + op.execute( + "CREATE TYPE jobstatus AS ENUM ('done', 'new', 'cancelled', 'removed', 'processing');" + ) + op.execute( + "ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::text::jobstatus;" + ) def downgrade() -> None: - op.execute('ALTER TABLE job ALTER COLUMN status TYPE VARCHAR USING status::text;') + op.execute( + "ALTER TABLE job ALTER COLUMN status TYPE VARCHAR USING status::text;" + ) diff --git a/src/migrations/versions/cbbba858deb0_init.py b/src/migrations/versions/cbbba858deb0_init.py index 808bfd8f..f69614e7 100644 --- a/src/migrations/versions/cbbba858deb0_init.py +++ b/src/migrations/versions/cbbba858deb0_init.py @@ -5,84 +5,110 @@ """ from alembic import op import sqlalchemy as sa +from sqlalchemy import inspect + import sqlmodel +from config import app_config revision = "cbbba858deb0" down_revision = None branch_labels = None depends_on = None +DATABASE_URL = app_config.database.url +engine = sa.create_engine(DATABASE_URL) + def upgrade() -> None: - op.create_table( - "agentgroup", - sa.Column("plugins_spec", sa.JSON(), nullable=True), - sa.Column("active_plugins", sa.ARRAY(sa.String()), nullable=True), - sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.Column( - "ursadb_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column("id", sa.Integer(), nullable=False), - sa.PrimaryKeyConstraint("id"), - ) - op.create_table( - "configentry", - sa.Column("id", sa.Integer(), nullable=False), - sa.Column( - "plugin", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column("key", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.Column("value", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.PrimaryKeyConstraint("id"), - ) - op.create_table( - "job", - sa.Column("taints", sa.ARRAY(sa.String()), nullable=True), - sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.Column( - "status", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column("error", sqlmodel.sql.sqltypes.AutoString(), nullable=True), - sa.Column( - "rule_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column( - "rule_author", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column( - "raw_yara", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column("submitted", sa.Integer(), nullable=False), - sa.Column("finished", sa.Integer(), nullable=True), - sa.Column("files_limit", sa.Integer(), nullable=False), - sa.Column( - "reference", sqlmodel.sql.sqltypes.AutoString(), nullable=False - ), - sa.Column("files_processed", sa.Integer(), nullable=False), - sa.Column("files_matched", sa.Integer(), nullable=False), - sa.Column("files_in_progress", sa.Integer(), nullable=False), - sa.Column("total_files", sa.Integer(), nullable=False), - sa.Column("files_errored", sa.Integer(), nullable=False), - sa.Column("datasets_left", sa.Integer(), nullable=False), - sa.Column("total_datasets", sa.Integer(), nullable=False), - sa.Column("agents_left", sa.Integer(), nullable=False), - sa.Column("internal_id", sa.Integer(), nullable=False), - sa.PrimaryKeyConstraint("internal_id"), - ) - op.create_table( - "match", - sa.Column("meta", sa.JSON(), nullable=True), - sa.Column("matches", sa.ARRAY(sa.String()), nullable=True), - sa.Column("id", sa.Integer(), nullable=False), - sa.Column("job_id", sa.Integer(), nullable=False), - sa.Column("file", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.ForeignKeyConstraint( - ["job_id"], - ["job.internal_id"], - ), - sa.PrimaryKeyConstraint("id"), - ) + if not inspect(engine).has_table("agentgroup"): + op.create_table( + "agentgroup", + sa.Column("plugins_spec", sa.JSON(), nullable=True), + sa.Column("active_plugins", sa.ARRAY(sa.String()), nullable=True), + sa.Column( + "name", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "ursadb_url", + sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ), + sa.Column("id", sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + if not inspect(engine).has_table("configentry"): + op.create_table( + "configentry", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "plugin", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "key", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "value", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.PrimaryKeyConstraint("id"), + ) + if not inspect(engine).has_table("job"): + op.create_table( + "job", + sa.Column("taints", sa.ARRAY(sa.String()), nullable=True), + sa.Column( + "id", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "status", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "error", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), + sa.Column( + "rule_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "rule_author", + sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + ), + sa.Column( + "raw_yara", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column("submitted", sa.Integer(), nullable=False), + sa.Column("finished", sa.Integer(), nullable=True), + sa.Column("files_limit", sa.Integer(), nullable=False), + sa.Column( + "reference", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column("files_processed", sa.Integer(), nullable=False), + sa.Column("files_matched", sa.Integer(), nullable=False), + sa.Column("files_in_progress", sa.Integer(), nullable=False), + sa.Column("total_files", sa.Integer(), nullable=False), + sa.Column("files_errored", sa.Integer(), nullable=False), + sa.Column("datasets_left", sa.Integer(), nullable=False), + sa.Column("total_datasets", sa.Integer(), nullable=False), + sa.Column("agents_left", sa.Integer(), nullable=False), + sa.Column("internal_id", sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint("internal_id"), + ) + if not inspect(engine).has_table("match"): + op.create_table( + "match", + sa.Column("meta", sa.JSON(), nullable=True), + sa.Column("matches", sa.ARRAY(sa.String()), nullable=True), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("job_id", sa.Integer(), nullable=False), + sa.Column( + "file", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.ForeignKeyConstraint( + ["job_id"], + ["job.internal_id"], + ), + sa.PrimaryKeyConstraint("id"), + ) def downgrade() -> None: diff --git a/src/migrations/versions/dbb81bd4d47f_add_jobagent.py b/src/migrations/versions/dbb81bd4d47f_add_jobagent.py index 3afb5166..ae7071ef 100644 --- a/src/migrations/versions/dbb81bd4d47f_add_jobagent.py +++ b/src/migrations/versions/dbb81bd4d47f_add_jobagent.py @@ -5,34 +5,41 @@ """ from alembic import op import sqlalchemy as sa +from sqlalchemy import inspect +from config import app_config + # revision identifiers, used by Alembic. revision = "dbb81bd4d47f" down_revision = "cbbba858deb0" branch_labels = None depends_on = None +DATABASE_URL = app_config.database.url +engine = sa.create_engine(DATABASE_URL) + def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.create_table( - "jobagent", - sa.Column("id", sa.Integer(), nullable=False), - sa.Column("task_in_progress", sa.Integer(), nullable=False), - sa.Column("job_id", sa.Integer(), nullable=False), - sa.Column("agent_id", sa.Integer(), nullable=False), - sa.ForeignKeyConstraint( - ["agent_id"], - ["agentgroup.id"], - ), - sa.ForeignKeyConstraint( - ["job_id"], - ["job.internal_id"], - ), - sa.PrimaryKeyConstraint("id"), - ) - # ### end Alembic commands ### + if not inspect(engine).has_table("jobagent"): + op.create_table( + "jobagent", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("task_in_progress", sa.Integer(), nullable=False), + sa.Column("job_id", sa.Integer(), nullable=False), + sa.Column("agent_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["agent_id"], + ["agentgroup.id"], + ), + sa.ForeignKeyConstraint( + ["job_id"], + ["job.internal_id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + # ### end Alembic commands ### def downgrade() -> None: diff --git a/src/models/job.py b/src/models/job.py index 5c7a4c0c..563d83b5 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -21,7 +21,7 @@ class JobBase(SQLModel): """Base class for entities related to mquery jobs.""" id: str - status: JobStatus = Field(sa_column=PgEnum(JobStatus, name="jobstatus", create_type=True)) + status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) error: Optional[str] rule_name: str rule_author: str From d028b53e3ca5db967f62e068af5f8c112b1db74b Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 1 Oct 2024 18:20:36 +0200 Subject: [PATCH 3/6] In INSTALL.md and README.md alembic command added. --- INSTALL.md | 2 ++ README.md | 1 + 2 files changed, 3 insertions(+) diff --git a/INSTALL.md b/INSTALL.md index e813b9a4..41bb9f46 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -21,6 +21,7 @@ mkdir samples vim .env docker compose up --scale daemon=3 # this will take a while docker compose exec web python3 -m mquery.db +docker compose exec web python3 -m alembic upgrade head ``` - Good for testing mquery and production deployments on a single server @@ -39,6 +40,7 @@ cd mquery vim .env docker compose -f docker-compose.dev.yml up # this will take a while docker compose exec dev-web python3 -m mquery.db +docker compose exec dev-web python3 -m alembic upgrade head ``` - Good for development - all file changes will be picked up automatically. diff --git a/README.md b/README.md index 0803755a..f2eb3b38 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ cd mquery vim .env # optional - change samples and index directory locations docker compose up --scale daemon=3 # building the images will take a while docker compose exec web python3 -m mquery.db +docker compose exec web python3 -m alembic upgrade head ``` The web interface should be available at `http://localhost`. From 1498a44520374a408cd444c7dcb7fb6a0132bcd5 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 1 Oct 2024 18:41:58 +0200 Subject: [PATCH 4/6] ignore enum type error --- src/models/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/models/job.py b/src/models/job.py index 563d83b5..a515803a 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -21,7 +21,7 @@ class JobBase(SQLModel): """Base class for entities related to mquery jobs.""" id: str - status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) + status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) # type: ignore error: Optional[str] rule_name: str rule_author: str From 98621c79355cf8ce83b6e17d29380f1a4cf48bdb Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 1 Oct 2024 22:53:41 +0200 Subject: [PATCH 5/6] Creating jobstatus enum by job.py site --- src/migrations/versions/84796bdc2f77_add_jobstatus.py | 3 --- src/models/job.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/migrations/versions/84796bdc2f77_add_jobstatus.py b/src/migrations/versions/84796bdc2f77_add_jobstatus.py index 1f60a001..e15f4670 100644 --- a/src/migrations/versions/84796bdc2f77_add_jobstatus.py +++ b/src/migrations/versions/84796bdc2f77_add_jobstatus.py @@ -14,9 +14,6 @@ def upgrade() -> None: - op.execute( - "CREATE TYPE jobstatus AS ENUM ('done', 'new', 'cancelled', 'removed', 'processing');" - ) op.execute( "ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::text::jobstatus;" ) diff --git a/src/models/job.py b/src/models/job.py index a515803a..3c0a0e11 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -21,7 +21,7 @@ class JobBase(SQLModel): """Base class for entities related to mquery jobs.""" id: str - status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) # type: ignore + status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus", create_type=True)) # type: ignore error: Optional[str] rule_name: str rule_author: str From e0ce4f226bbf6f05070a0e5ab7fe986aca9da4ce Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Wed, 2 Oct 2024 09:13:45 +0200 Subject: [PATCH 6/6] Creating jobstatus enum in migration file --- src/migrations/versions/84796bdc2f77_add_jobstatus.py | 3 +++ src/models/job.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/migrations/versions/84796bdc2f77_add_jobstatus.py b/src/migrations/versions/84796bdc2f77_add_jobstatus.py index e15f4670..1f60a001 100644 --- a/src/migrations/versions/84796bdc2f77_add_jobstatus.py +++ b/src/migrations/versions/84796bdc2f77_add_jobstatus.py @@ -14,6 +14,9 @@ def upgrade() -> None: + op.execute( + "CREATE TYPE jobstatus AS ENUM ('done', 'new', 'cancelled', 'removed', 'processing');" + ) op.execute( "ALTER TABLE job ALTER COLUMN status TYPE jobstatus USING status::text::jobstatus;" ) diff --git a/src/models/job.py b/src/models/job.py index 3c0a0e11..a515803a 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -21,7 +21,7 @@ class JobBase(SQLModel): """Base class for entities related to mquery jobs.""" id: str - status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus", create_type=True)) # type: ignore + status: JobStatus = Field(sa_type=PgEnum(JobStatus, name="jobstatus")) # type: ignore error: Optional[str] rule_name: str rule_author: str