Skip to content

Commit

Permalink
creates pydantic model and updates deployment orm model
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano committed Sep 9, 2024
1 parent 746bbe6 commit 346bdfb
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/prefect/server/database/migrations/MIGRATION-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Each time a database migration is written, an entry is included here with:

This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging.

# Expands `Deployments.concurrency_limit` to use either int or `ConcurrencyOptions`

SQLite: `78bf051472eb`
Postgres: `3b4bc664921d`

# Add `concurrency_limit` to `Deployments`
SQLite: `f93e1439f022`
Postgres:`97429116795e`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""change deployment concurrency limit type
Revision ID: 3b4bc664921d
Revises: 97429116795e
Create Date: 2024-09-06 17:08:43.236854
"""
from alembic import op
import sqlalchemy as sa
import prefect
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '3b4bc664921d'
down_revision = '97429116795e'
branch_labels = None
depends_on = None




def upgrade():
op.alter_column('deployment', 'concurrency_limit',
existing_type=sa.INTEGER(),
type_=prefect.server.utilities.database.Pydantic(),
existing_nullable=True)

def downgrade():
op.alter_column('deployment', 'concurrency_limit',
existing_type=prefect.server.utilities.database.Pydantic(),
type_=sa.INTEGER(),
existing_nullable=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""change deployment concurrency limit type
Revision ID: 78bf051472eb
Revises: f93e1439f022
Create Date: 2024-09-06 16:32:02.584968
"""
from alembic import op
import sqlalchemy as sa
import prefect
from sqlalchemy.dialects import sqlite
from prefect.server.schemas.core import ConcurrencyOptions

# revision identifiers, used by Alembic.
revision = '78bf051472eb'
down_revision = 'f93e1439f022'
branch_labels = None
depends_on = None




def upgrade():
with op.batch_alter_table('deployment', schema=None) as batch_op:
batch_op.alter_column('concurrency_limit',
existing_type=sa.INTEGER(),
type_=prefect.server.utilities.database.Pydantic(ConcurrencyOptions),
existing_nullable=True)

def downgrade():
with op.batch_alter_table('deployment', schema=None) as batch_op:
batch_op.alter_column('concurrency_limit',
existing_type=prefect.server.utilities.database.Pydantic(ConcurrencyOptions),
type_=sa.INTEGER(),
existing_nullable=True)
6 changes: 4 additions & 2 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,10 @@ def job_variables(self):
order_by=sa.desc(sa.text("updated")),
)

concurrency_limit: Mapped[Union[int, None]] = mapped_column(
sa.Integer, default=None, nullable=True
concurrency_limit: Mapped[Union[int, schemas.core.ConcurrencyOptions, None]] = mapped_column(
Pydantic(schemas.core.ConcurrencyOptions),
default=None,
nullable=True,
)
tags: Mapped[List[str]] = mapped_column(
JSON, server_default="[]", default=list, nullable=False
Expand Down
22 changes: 20 additions & 2 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
PositiveInteger,
StrictVariableValue,
)
from prefect.utilities.collections import dict_to_flatdict, flatdict_to_dict, listrepr
from prefect.utilities.collections import (
AutoEnum,
dict_to_flatdict,
flatdict_to_dict,
listrepr,
)
from prefect.utilities.names import generate_slug, obfuscate

if TYPE_CHECKING:
Expand Down Expand Up @@ -523,6 +528,19 @@ def validate_max_scheduled_runs(cls, v):
return validate_schedule_max_scheduled_runs(
v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value()
)

class ConcurrencyCollisionStrategy(AutoEnum):
"""
Enumeration of concurrency collision strategies.
"""
ENQUEUE = AutoEnum.auto()
CANCEL = AutoEnum.auto()
class ConcurrencyOptions(BaseModel):
"""
Options for configuring deployment concurrency limits.
"""
concurrency: int
collision_strategy: ConcurrencyCollisionStrategy


class Deployment(ORMBaseModel):
Expand All @@ -546,7 +564,7 @@ class Deployment(ORMBaseModel):
schedules: List[DeploymentSchedule] = Field(
default_factory=list, description="A list of schedules for the deployment."
)
concurrency_limit: Optional[PositiveInteger] = Field(
concurrency_limit: Optional[Union[NonNegativeInteger, ConcurrencyOptions]] = Field(
default=None, description="The concurrency limit for the deployment."
)
job_variables: Dict[str, Any] = Field(
Expand Down
15 changes: 15 additions & 0 deletions tests/server/models/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ async def test_create_deployment_with_updated_by(self, session, flow):
)
assert updated_deployment.updated_by.type == new_updated_by.type

async def test_create_deployment_with_concurrency_limit(self, session, flow):
concurrency_options = schemas.core.ConcurrencyOptions(
concurrency=10, collision_strategy=schemas.core.ConcurrencyCollisionStrategy.ENQUEUE
)
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name="My Deployment",
flow_id=flow.id,
concurrency_limit=concurrency_options,
),
)
assert deployment.concurrency_limit == concurrency_options



class TestReadDeployment:
async def test_read_deployment(self, session, flow, flow_function):
Expand Down

0 comments on commit 346bdfb

Please sign in to comment.