Skip to content

Commit

Permalink
Functionality for Select+Insert
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-balachander committed Nov 18, 2024
1 parent 730ba6c commit 138241e
Show file tree
Hide file tree
Showing 12 changed files with 960 additions and 166 deletions.
24 changes: 18 additions & 6 deletions cumulusci/tasks/bulkdata/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,14 @@ def _execute_step(

def process_lookup_fields(self, mapping, fields, polymorphic_fields):
"""Modify fields and priority fields based on lookup and polymorphic checks."""
# Store the lookups and their original order for re-insertion at the end
original_lookups = [name for name in fields if name in mapping.lookups]
max_insert_index = -1
for name, lookup in mapping.lookups.items():
if name in fields:
# Get the index of the lookup field before removing it
insert_index = fields.index(name)
max_insert_index = max(max_insert_index, insert_index)
# Remove the lookup field from fields
fields.remove(name)

Expand Down Expand Up @@ -351,14 +355,15 @@ def process_lookup_fields(self, mapping, fields, polymorphic_fields):
None,
)
if lookup_mapping_step:
lookup_fields = lookup_mapping_step.get_load_field_list()
lookup_fields = lookup_mapping_step.fields.keys()
# Insert fields in the format {relationship_name}.{ref_type}.{lookup_field}
for field in lookup_fields:
fields.insert(
insert_index,
f"{relationship_name}.{lookup_mapping_step.sf_object}.{field}",
)
insert_index += 1
max_insert_index = max(max_insert_index, insert_index)
if lookup_in_priority_fields:
mapping.select_options.priority_fields[
f"{relationship_name}.{lookup_mapping_step.sf_object}.{field}"
Expand All @@ -383,17 +388,24 @@ def process_lookup_fields(self, mapping, fields, polymorphic_fields):

if lookup_mapping_step:
relationship_name = polymorphic_fields[name]["relationshipName"]
lookup_fields = lookup_mapping_step.get_load_field_list()
lookup_fields = lookup_mapping_step.fields.keys()

# Insert the new fields at the same position as the removed lookup field
for field in lookup_fields:
fields.insert(insert_index, f"{relationship_name}.{field}")
insert_index += 1
max_insert_index = max(max_insert_index, insert_index)
if lookup_in_priority_fields:
mapping.select_options.priority_fields[
f"{relationship_name}.{field}"
] = f"{relationship_name}.{field}"

# Append the original lookups at the end in the same order
for name in original_lookups:
if name not in fields:
fields.insert(max_insert_index, name)
max_insert_index += 1

def configure_step(self, mapping):
"""Create a step appropriate to the action"""
bulk_mode = mapping.bulk_mode or self.bulk_mode or "Parallel"
Expand Down Expand Up @@ -479,6 +491,7 @@ def configure_step(self, mapping):
selection_filter=mapping.select_options.filter,
selection_priority_fields=mapping.select_options.priority_fields,
content_type=content_type,
threshold=mapping.select_options.threshold,
)
return step, query

Expand Down Expand Up @@ -588,10 +601,9 @@ def _query_db(self, mapping):
mapping, self.mapping, self.metadata, model, self._old_format
)
)
else:
transformers.append(
AddLookupsToQuery(mapping, self.metadata, model, self._old_format)
)
transformers.append(
AddLookupsToQuery(mapping, self.metadata, model, self._old_format)
)

transformers.extend([cls(mapping, self.metadata, model) for cls in classes])

Expand Down
1 change: 1 addition & 0 deletions cumulusci/tasks/bulkdata/mapping_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MappingLookup(CCIDictModel):
join_field: Optional[str] = None
after: Optional[str] = None
aliased_table: Optional[Any] = None
parent_tables: Optional[Any] = None
name: Optional[str] = None # populated by parent

def get_lookup_key_field(self, model=None):
Expand Down
20 changes: 10 additions & 10 deletions cumulusci/tasks/bulkdata/query_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def columns_to_add(self):
columns = []
for lookup in self.lookups:
tables = lookup.table if isinstance(lookup.table, list) else [lookup.table]
lookup.aliased_table = [
lookup.parent_tables = [
aliased(
self.metadata.tables[table], name=f"{lookup.name}_{table}_alias"
)
for table in tables
]

for aliased_table, table_name in zip(lookup.aliased_table, tables):
for parent_table, table_name in zip(lookup.parent_tables, tables):
# Find the mapping step for this polymorphic type
lookup_mapping_step = next(
(
Expand All @@ -124,24 +124,24 @@ def columns_to_add(self):
None,
)
if lookup_mapping_step:
load_fields = lookup_mapping_step.get_load_field_list()
load_fields = lookup_mapping_step.fields.keys()
for field in load_fields:
if field in lookup_mapping_step.fields:
matching_column = next(
(
col
for col in aliased_table.columns
for col in parent_table.columns
if col.name == lookup_mapping_step.fields[field]
)
)
columns.append(
matching_column.label(f"{aliased_table.name}_{field}")
matching_column.label(f"{parent_table.name}_{field}")
)
else:
# Append an empty string if the field is not present
columns.append(
literal_column("''").label(
f"{aliased_table.name}_{field}"
f"{parent_table.name}_{field}"
)
)
return columns
Expand All @@ -150,15 +150,15 @@ def columns_to_add(self):
def outerjoins_to_add(self):
"""Add outer joins for each lookup table directly, including handling for polymorphic lookups."""

def join_for_lookup(lookup, aliased_table):
def join_for_lookup(lookup, parent_table):
key_field = lookup.get_lookup_key_field(self.model)
value_column = getattr(self.model, key_field)
return (aliased_table, aliased_table.columns.id == value_column)
return (parent_table, parent_table.columns.id == value_column)

joins = []
for lookup in self.lookups:
for aliased_table in lookup.aliased_table:
joins.append(join_for_lookup(lookup, aliased_table))
for parent_table in lookup.parent_tables:
joins.append(join_for_lookup(lookup, parent_table))
return joins


Expand Down
Loading

0 comments on commit 138241e

Please sign in to comment.