diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/writes/top_level_mutations/delete_many.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/writes/top_level_mutations/delete_many.rs index 4011011c895..e4b264e8bb3 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/writes/top_level_mutations/delete_many.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/writes/top_level_mutations/delete_many.rs @@ -176,6 +176,32 @@ mod delete_many { Ok(()) } + // "The delete many Mutation" should "delete max the number of items specified in the limit" + #[connector_test] + async fn should_delete_max_limit_items(runner: Runner) -> TestResult<()> { + create_row(&runner, r#"{ id: 1, title: "title1" }"#).await?; + create_row(&runner, r#"{ id: 2, title: "title2" }"#).await?; + create_row(&runner, r#"{ id: 3, title: "title3" }"#).await?; + create_row(&runner, r#"{ id: 4, title: "title4" }"#).await?; + + assert_todo_count(&runner, 4).await?; + + insta::assert_snapshot!( + run_query!(&runner, r#"mutation { + deleteManyTodo( + limit: 3 + ){ + count + } + }"#), + @r###"{"data":{"deleteManyTodo":{"count":3}}}"### + ); + + assert_todo_count(&runner, 1).await?; + + Ok(()) + } + fn nested_del_many() -> String { let schema = indoc! { r#"model ZChild{ diff --git a/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs b/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs index b5aaea79d9b..a0c49fca73a 100644 --- a/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs +++ b/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs @@ -162,6 +162,7 @@ impl WriteOperations for MongoDbConnection { &mut self, model: &Model, record_filter: connector_interface::RecordFilter, + limit: Option, _traceparent: Option, ) -> connector_interface::Result { catch(write::delete_records( @@ -169,6 +170,7 @@ impl WriteOperations for MongoDbConnection { &mut self.session, model, record_filter, + limit, )) .await } diff --git a/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs b/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs index 0ba1350d92f..00115b5cfba 100644 --- a/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs +++ b/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs @@ -191,6 +191,7 @@ impl WriteOperations for MongoDbTransaction<'_> { &mut self, model: &Model, record_filter: connector_interface::RecordFilter, + limit: Option, _traceparent: Option, ) -> connector_interface::Result { catch(write::delete_records( @@ -198,6 +199,7 @@ impl WriteOperations for MongoDbTransaction<'_> { &mut self.connection.session, model, record_filter, + limit, )) .await } diff --git a/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs b/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs index 2564b56e371..40d385bfad5 100644 --- a/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs +++ b/query-engine/connectors/mongodb-query-connector/src/root_queries/write.rs @@ -168,7 +168,7 @@ pub async fn update_records<'conn>( .collect::>>()? } else { let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?; - find_ids(coll.clone(), session, model, filter).await? + find_ids(coll.clone(), session, model, filter, None).await? }; if ids.is_empty() { @@ -228,6 +228,7 @@ pub async fn delete_records<'conn>( session: &mut ClientSession, model: &Model, record_filter: RecordFilter, + limit: Option, ) -> crate::Result { let coll = database.collection::(model.db_name()); let id_field = pick_singular_id(model); @@ -235,6 +236,7 @@ pub async fn delete_records<'conn>( let ids = if let Some(selectors) = record_filter.selectors { selectors .into_iter() + .take(limit.unwrap_or(std::i64::MAX) as usize) .map(|p| { (&id_field, p.values().next().unwrap()) .into_bson() @@ -243,7 +245,7 @@ pub async fn delete_records<'conn>( .collect::>>()? } else { let filter = MongoFilterVisitor::new(FilterPrefix::default(), false).visit(record_filter.filter)?; - find_ids(coll.clone(), session, model, filter).await? + find_ids(coll.clone(), session, model, filter, limit).await? }; if ids.is_empty() { @@ -303,6 +305,7 @@ async fn find_ids( session: &mut ClientSession, model: &Model, filter: MongoFilter, + limit: Option, ) -> crate::Result> { let id_field = model.primary_identifier(); let mut builder = MongoReadQueryBuilder::new(model.clone()); @@ -316,6 +319,8 @@ async fn find_ids( builder.query = Some(filter); }; + builder.limit = limit; + let builder = builder.with_model_projection(id_field)?; let query = builder.build()?; let docs = query.execute(collection, session).await?; diff --git a/query-engine/connectors/query-connector/src/interface.rs b/query-engine/connectors/query-connector/src/interface.rs index c196c76bc30..bfc6c5d725a 100644 --- a/query-engine/connectors/query-connector/src/interface.rs +++ b/query-engine/connectors/query-connector/src/interface.rs @@ -326,6 +326,7 @@ pub trait WriteOperations { &mut self, model: &Model, record_filter: RecordFilter, + limit: Option, traceparent: Option, ) -> crate::Result; diff --git a/query-engine/connectors/sql-query-connector/src/database/connection.rs b/query-engine/connectors/sql-query-connector/src/database/connection.rs index af0a15192c1..e47fb9ce019 100644 --- a/query-engine/connectors/sql-query-connector/src/database/connection.rs +++ b/query-engine/connectors/sql-query-connector/src/database/connection.rs @@ -272,12 +272,13 @@ where &mut self, model: &Model, record_filter: RecordFilter, + limit: Option, traceparent: Option, ) -> connector::Result { let ctx = Context::new(&self.connection_info, traceparent); catch( &self.connection_info, - write::delete_records(&self.inner, model, record_filter, &ctx), + write::delete_records(&self.inner, model, record_filter, limit, &ctx), ) .await } diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs index b71fc782671..37fba8854da 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs @@ -445,6 +445,7 @@ pub(crate) async fn delete_records( conn: &dyn Queryable, model: &Model, record_filter: RecordFilter, + limit: Option, ctx: &Context<'_>, ) -> crate::Result { let filter_condition = FilterBuilder::without_top_level_joins().visit_filter(record_filter.clone().filter, ctx); @@ -453,14 +454,22 @@ pub(crate) async fn delete_records( let row_count = if record_filter.has_selectors() { let ids: Vec<_> = record_filter.selectors.as_ref().unwrap().iter().collect(); let mut row_count = 0; + let mut remaining_limit = limit; - for delete in write::delete_many_from_ids_and_filter(model, ids.as_slice(), filter_condition, ctx) { + for delete in + write::delete_many_from_ids_and_filter(model, ids.as_slice(), filter_condition, remaining_limit, ctx) + { row_count += conn.execute(delete).await?; + if let Some(limit) = remaining_limit { + remaining_limit = Some(limit - row_count as i64); + if remaining_limit.unwrap() <= 0 { + break; + } + } } - row_count } else { - conn.execute(write::delete_many_from_filter(model, filter_condition, ctx)) + conn.execute(write::delete_many_from_filter(model, filter_condition, limit, ctx)) .await? }; diff --git a/query-engine/connectors/sql-query-connector/src/database/transaction.rs b/query-engine/connectors/sql-query-connector/src/database/transaction.rs index 9a3429ab6e2..cdf5a466237 100644 --- a/query-engine/connectors/sql-query-connector/src/database/transaction.rs +++ b/query-engine/connectors/sql-query-connector/src/database/transaction.rs @@ -280,11 +280,12 @@ impl WriteOperations for SqlConnectorTransaction<'_> { &mut self, model: &Model, record_filter: RecordFilter, + limit: Option, traceparent: Option, ) -> connector::Result { catch(&self.connection_info, async { let ctx = Context::new(&self.connection_info, traceparent); - write::delete_records(self.inner.as_queryable(), model, record_filter, &ctx).await + write::delete_records(self.inner.as_queryable(), model, record_filter, limit, &ctx).await }) .await } diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs index aa010044b0b..0ec506e1b6f 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs @@ -226,10 +226,32 @@ pub(crate) fn delete_returning( pub(crate) fn delete_many_from_filter( model: &Model, filter_condition: ConditionTree<'static>, + limit: Option, ctx: &Context<'_>, ) -> Query<'static> { + let condition = if let Some(limit) = limit { + let columns = model + .primary_identifier() + .as_scalar_fields() + .expect("primary identifier must contain scalar fields") + .into_iter() + .map(|f| f.as_column(ctx)) + .collect::>(); + + ConditionTree::from( + Row::from(columns.clone()).in_selection( + Select::from_table(model.as_table(ctx)) + .columns(columns) + .so_that(filter_condition) + .limit(limit as usize), + ), + ) + } else { + filter_condition + }; + Delete::from_table(model.as_table(ctx)) - .so_that(filter_condition) + .so_that(condition) .add_traceparent(ctx.traceparent) .into() } @@ -238,6 +260,7 @@ pub(crate) fn delete_many_from_ids_and_filter( model: &Model, ids: &[&SelectionResult], filter_condition: ConditionTree<'static>, + limit: Option, ctx: &Context<'_>, ) -> Vec> { let columns: Vec<_> = ModelProjection::from(model.primary_identifier()) @@ -245,7 +268,7 @@ pub(crate) fn delete_many_from_ids_and_filter( .collect(); super::chunked_conditions(&columns, ids, ctx, |conditions| { - delete_many_from_filter(model, conditions.and(filter_condition.clone()), ctx) + delete_many_from_filter(model, conditions.and(filter_condition.clone()), limit, ctx) }) } diff --git a/query-engine/core/src/interpreter/query_interpreters/write.rs b/query-engine/core/src/interpreter/query_interpreters/write.rs index 065f5e6eebe..53de1a18b40 100644 --- a/query-engine/core/src/interpreter/query_interpreters/write.rs +++ b/query-engine/core/src/interpreter/query_interpreters/write.rs @@ -295,7 +295,7 @@ async fn delete_one( Ok(QueryResult::RecordSelection(Some(Box::new(selection)))) } else { - let result = tx.delete_records(&q.model, filter, traceparent).await?; + let result = tx.delete_records(&q.model, filter, None, traceparent).await?; Ok(QueryResult::Count(result)) } } @@ -337,7 +337,7 @@ async fn delete_many( q: DeleteManyRecords, traceparent: Option, ) -> InterpretationResult { - let res = tx.delete_records(&q.model, q.record_filter, traceparent).await?; + let res = tx.delete_records(&q.model, q.record_filter, q.limit, traceparent).await?; Ok(QueryResult::Count(res)) } diff --git a/query-engine/core/src/query_ast/write.rs b/query-engine/core/src/query_ast/write.rs index 9d30e3ff7b5..51ddf05724f 100644 --- a/query-engine/core/src/query_ast/write.rs +++ b/query-engine/core/src/query_ast/write.rs @@ -397,6 +397,7 @@ pub struct DeleteRecordFields { pub struct DeleteManyRecords { pub model: Model, pub record_filter: RecordFilter, + pub limit: Option, } #[derive(Debug, Clone)] diff --git a/query-engine/core/src/query_graph_builder/write/delete.rs b/query-engine/core/src/query_graph_builder/write/delete.rs index 57cb90db527..f760f9c82ea 100644 --- a/query-engine/core/src/query_graph_builder/write/delete.rs +++ b/query-engine/core/src/query_graph_builder/write/delete.rs @@ -1,11 +1,12 @@ use super::*; +use crate::query_document::ParsedInputValue; use crate::{ query_ast::*, query_graph::{Node, QueryGraph, QueryGraphDependency}, ArgumentListLookup, FilteredQuery, ParsedField, }; use psl::datamodel_connector::ConnectorCapability; -use query_structure::{Filter, Model}; +use query_structure::{Filter, Model, PrismaValue}; use schema::{constants::args, QuerySchema}; use std::convert::TryInto; @@ -110,12 +111,18 @@ pub fn delete_many_records( Some(where_arg) => extract_filter(where_arg.value.try_into()?, &model)?, None => Filter::empty(), }; + let limit = field.arguments.lookup(args::LIMIT) + .and_then(|limit_arg| match limit_arg.value { + ParsedInputValue::Single(PrismaValue::Int(i)) => Some(i), + _ => None, + }); let model_id = model.primary_identifier(); let record_filter = filter.clone().into(); let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords { model: model.clone(), record_filter, + limit, }); let delete_many_node = graph.create_node(Query::Write(delete_many)); diff --git a/query-engine/core/src/query_graph_builder/write/nested/delete_nested.rs b/query-engine/core/src/query_graph_builder/write/nested/delete_nested.rs index a2c391fb7ec..5044451193f 100644 --- a/query-engine/core/src/query_graph_builder/write/nested/delete_nested.rs +++ b/query-engine/core/src/query_graph_builder/write/nested/delete_nested.rs @@ -44,6 +44,7 @@ pub fn nested_delete( let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords { model: child_model.clone(), record_filter: or_filter.clone().into(), + limit: None, }); let delete_many_node = graph.create_node(Query::Write(delete_many)); @@ -157,6 +158,7 @@ pub fn nested_delete_many( let delete_many = WriteQuery::DeleteManyRecords(DeleteManyRecords { model: child_model.clone(), record_filter: RecordFilter::empty(), + limit: None, }); let delete_many_node = graph.create_node(Query::Write(delete_many)); diff --git a/query-engine/core/src/query_graph_builder/write/utils.rs b/query-engine/core/src/query_graph_builder/write/utils.rs index d9f89feade6..d73f5ae4a0b 100644 --- a/query-engine/core/src/query_graph_builder/write/utils.rs +++ b/query-engine/core/src/query_graph_builder/write/utils.rs @@ -521,6 +521,7 @@ pub fn emulate_on_delete_cascade( let delete_query = WriteQuery::DeleteManyRecords(DeleteManyRecords { model: dependent_model.clone(), record_filter: RecordFilter::empty(), + limit: None, }); let delete_dependents_node = graph.create_node(Query::Write(delete_query)); diff --git a/query-engine/schema/src/build/input_types/fields/arguments.rs b/query-engine/schema/src/build/input_types/fields/arguments.rs index e47c3c5f086..7e6f51b05a6 100644 --- a/query-engine/schema/src/build/input_types/fields/arguments.rs +++ b/query-engine/schema/src/build/input_types/fields/arguments.rs @@ -85,7 +85,10 @@ pub(crate) fn update_many_arguments(ctx: &QuerySchema, model: Model) -> Vec Vec> { let where_arg = where_argument(ctx, &model); - vec![where_arg] + vec![ + where_arg, + input_field(args::LIMIT, vec![InputType::int()], None).optional(), + ] } /// Builds "many records where" arguments based on the given model and field. diff --git a/query-engine/schema/src/constants.rs b/query-engine/schema/src/constants.rs index 7911492ae94..47adbb82235 100644 --- a/query-engine/schema/src/constants.rs +++ b/query-engine/schema/src/constants.rs @@ -23,6 +23,9 @@ pub mod args { // createMany-specific args pub const SKIP_DUPLICATES: &str = "skipDuplicates"; + + // deleteMany-specific args + pub const LIMIT: &str = "limit"; } pub mod operations {