From 43435c348656ebb13ff8e797a11e3d24f5d2ee22 Mon Sep 17 00:00:00 2001
From: aditya-balachander
<139134092+aditya-balachander@users.noreply.github.com>
Date: Fri, 29 Sep 2023 03:27:51 +0530
Subject: [PATCH] Added xpath 'find_and_replace' functionality to transforms
(#3655)
---
.../tests/test_transforms.py | 721 ++++++++++++------
.../core/source_transforms/transforms.py | 86 ++-
docs/deploy.md | 78 +-
3 files changed, 637 insertions(+), 248 deletions(-)
diff --git a/cumulusci/core/source_transforms/tests/test_transforms.py b/cumulusci/core/source_transforms/tests/test_transforms.py
index 4fb9015d5d..99230f2fe2 100644
--- a/cumulusci/core/source_transforms/tests/test_transforms.py
+++ b/cumulusci/core/source_transforms/tests/test_transforms.py
@@ -7,6 +7,7 @@
from zipfile import ZipFile
import pytest
+from lxml import etree as ET
from pydantic import ValidationError
from cumulusci.core.exceptions import CumulusCIException, TaskOptionsError
@@ -376,304 +377,546 @@ def test_bundle_static_resources(task_context):
assert compare_spec == zf
-def test_find_replace_static(task_context):
+def create_builder(task_context, zip_content, patterns):
builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("Foo.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
+ ZipFileSpec(zip_content).as_zipfile(),
context=task_context,
transforms=[
FindReplaceTransform(
- FindReplaceTransformOptions.parse_obj(
- {"patterns": [{"find": "bl", "replace": "ye"}]}
- )
+ FindReplaceTransformOptions.parse_obj({"patterns": patterns})
)
],
)
- assert (
- ZipFileSpec(
- {
- Path("Foo.cls"): "System.debug('yeah');",
- }
- )
- == builder.zf
- )
+ return builder
+
+
+def zip_assert(builder, modified_zip_content):
+ assert ZipFileSpec(modified_zip_content) == builder.zf
+
+
+def test_find_replace_static(task_context):
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "replace": "ye"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path("Foo.cls"): "System.debug('yeah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_static(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): "blahblah",
+ }
+ patterns = [{"xpath": "/root/element1", "replace": "yeah"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): "yeahblah",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_find_replace_xmlFile(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "replace": "ye"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): "yeahyeah",
+ Path("Bar.cls"): "System.debug('yeah');",
+ }
+ zip_assert(builder, modified_zip_content)
def test_find_replace_environ(task_context):
with mock.patch.dict(os.environ, {"INSERT_TEXT": "ye"}):
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("Foo.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[
- FindReplaceTransform(
- FindReplaceTransformOptions.parse_obj(
- {"patterns": [{"find": "bl", "replace_env": "INSERT_TEXT"}]}
- )
- )
- ],
- )
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "replace_env": "INSERT_TEXT"}]
+ builder = create_builder(task_context, zip_content, patterns)
- assert (
- ZipFileSpec(
- {
- Path("Foo.cls"): "System.debug('yeah');",
- }
- )
- == builder.zf
- )
+ modified_zip_content = {
+ Path("Foo.cls"): "System.debug('yeah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_environ(task_context):
+ with mock.patch.dict(os.environ, {"INSERT_TEXT": "yeah"}):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): "blahblah",
+ }
+ patterns = [{"xpath": "/root/element1", "replace_env": "INSERT_TEXT"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): "yeahblah",
+ }
+ zip_assert(builder, modified_zip_content)
def test_find_replace_environ__not_found(task_context):
assert "INSERT_TEXT" not in os.environ
with pytest.raises(TaskOptionsError):
- MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("Foo.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[
- FindReplaceTransform(
- FindReplaceTransformOptions.parse_obj(
- {"patterns": [{"find": "bl", "replace_env": "INSERT_TEXT"}]}
- )
- )
- ],
- )
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "replace_env": "INSERT_TEXT"}]
+ create_builder(task_context, zip_content, patterns)
-def test_find_replace_filtered(task_context):
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('blah');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[
- FindReplaceTransform(
- FindReplaceTransformOptions.parse_obj(
- {
- "patterns": [
- {"find": "bl", "replace": "ye", "paths": ["classes"]}
- ]
- }
- )
- )
- ],
- )
+def test_xpath_replace_environ__not_found(task_context):
+ assert "INSERT_TEXT" not in os.environ
+ with pytest.raises(TaskOptionsError):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): "blahblah",
+ }
+ patterns = [{"xpath": "/root/element1", "replace_env": "INSERT_TEXT"}]
+ create_builder(task_context, zip_content, patterns)
- assert (
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('yeah');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- )
- == builder.zf
- )
+def test_find_replace_filtered(task_context):
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('blah');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "replace": "ye", "paths": ["classes"]}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('yeah');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_filtered(task_context):
+ zip_content = {
+ Path("classes")
+ / "Foo.xml": "blahblah",
+ Path(
+ "Bar.cls"
+ ): "blahblah",
+ }
+ patterns = [{"xpath": "/root/element1", "replace": "yeah", "paths": ["classes"]}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path("classes")
+ / "Foo.xml": "yeahblah",
+ Path(
+ "Bar.cls"
+ ): "blahblah",
+ }
+ zip_assert(builder, modified_zip_content)
-def test_find_replace_multiple(task_context):
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('blah');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[
- FindReplaceTransform(
- FindReplaceTransformOptions.parse_obj(
- {
- "patterns": [
- {"find": "bl", "replace": "ye", "paths": ["classes"]},
- {"find": "ye", "replace": "ha"},
- ]
- }
- )
- )
- ],
- )
- assert (
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('haah');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- )
- == builder.zf
- )
+def test_find_replace_multiple(task_context):
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('blah');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ patterns = [
+ {"find": "bl", "replace": "ye", "paths": ["classes"]},
+ {"find": "ye", "replace": "ha"},
+ ]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('haah');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_multiple(task_context):
+ zip_content = {
+ Path("classes")
+ / "Foo.xml": "blahblah",
+ Path(
+ "Bar.cls"
+ ): "blahblah",
+ }
+ patterns = [
+ {"xpath": "/root/element1", "replace": "yeah", "paths": ["classes"]},
+ {"xpath": "/root/element1", "replace": "haah", "paths": ["classes"]},
+ {"xpath": "/root/element3", "replace": "yeah", "paths": ["classes"]},
+ ]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path("classes")
+ / "Foo.xml": "haahblah",
+ Path(
+ "Bar.cls"
+ ): "blahblah",
+ }
+ zip_assert(builder, modified_zip_content)
def test_find_replace_current_user(task_context):
- options = FindReplaceTransformOptions.parse_obj(
- {
- "patterns": [
- {"find": "%%%CURRENT_USER%%%", "inject_username": True},
- ]
- }
- )
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('%%%CURRENT_USER%%%');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[FindReplaceTransform(options)],
- )
-
+ patterns = [
+ {"find": "%%%CURRENT_USER%%%", "inject_username": True},
+ ]
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('%%%CURRENT_USER%%%');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(task_context, zip_content, patterns)
expected_username = task_context.org_config.username
- assert (
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": f"System.debug('{expected_username}');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- )
- == builder.zf
- )
+ modified_zip_content = {
+ Path("classes") / "Foo.cls": f"System.debug('{expected_username}');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_current_user(task_context):
+ patterns = [
+ {"xpath": "/root/element1", "inject_username": True},
+ ]
+ zip_content = {
+ Path("classes")
+ / "Foo.cls": "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(task_context, zip_content, patterns)
+ expected_username = task_context.org_config.username
+ modified_zip_content = {
+ Path("classes")
+ / "Foo.cls": f"{expected_username}blah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
def test_find_replace_org_url(task_context):
- options = FindReplaceTransformOptions.parse_obj(
+ patterns = [
{
- "patterns": [
- {
- "find": "{url}",
- "inject_org_url": True,
- },
- ]
- }
- )
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('{url}');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=task_context,
- transforms=[FindReplaceTransform(options)],
- )
+ "find": "{url}",
+ "inject_org_url": True,
+ },
+ ]
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('{url}');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(task_context, zip_content, patterns)
+ instance_url = task_context.org_config.instance_url
+ modified_zip_content = {
+ Path("classes") / "Foo.cls": f"System.debug('{instance_url}');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
+
+def test_xpath_replace_org_url(task_context):
+ patterns = [
+ {
+ "xpath": "/root/element1",
+ "inject_org_url": True,
+ },
+ ]
+ zip_content = {
+ Path("classes")
+ / "Foo.cls": "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(task_context, zip_content, patterns)
instance_url = task_context.org_config.instance_url
- assert (
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": f"System.debug('{instance_url}');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- )
- == builder.zf
- )
+ modified_zip_content = {
+ Path("classes")
+ / "Foo.cls": f"{instance_url}blah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
@pytest.mark.parametrize("api", [FindReplaceIdAPI.REST, FindReplaceIdAPI.TOOLING])
-def test_find_replace_id(api):
+def test_xpath_replace_id(api):
context = mock.Mock()
result = {"totalSize": 1, "records": [{"Id": "00D"}]}
context.org_config.salesforce_client.query.return_value = result
context.org_config.tooling.query.return_value = result
- options = FindReplaceTransformOptions.parse_obj(
+ patterns = [
{
- "patterns": [
- {
- "find": "00Y",
- "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
- "api": api,
- },
- ]
- }
- )
- builder = MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('00Y');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=context,
- transforms=[FindReplaceTransform(options)],
- )
+ "xpath": "/root/element1",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ "api": api,
+ },
+ ]
+ zip_content = {
+ Path("classes")
+ / "Foo.cls": "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(context, zip_content, patterns)
+ modified_zip_content = {
+ Path("classes")
+ / "Foo.cls": "00Dblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
- assert (
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('00D');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- )
- == builder.zf
- )
+
+@pytest.mark.parametrize("api", [FindReplaceIdAPI.REST, FindReplaceIdAPI.TOOLING])
+def test_find_replace_id(api):
+ context = mock.Mock()
+ result = {"totalSize": 1, "records": [{"Id": "00D"}]}
+ context.org_config.salesforce_client.query.return_value = result
+ context.org_config.tooling.query.return_value = result
+ patterns = [
+ {
+ "find": "00Y",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ "api": api,
+ },
+ ]
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('00Y');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ builder = create_builder(context, zip_content, patterns)
+ modified_zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('00D');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ zip_assert(builder, modified_zip_content)
def test_find_replace_id__bad_query_result():
context = mock.Mock()
result = {"totalSize": 0}
context.org_config.salesforce_client.query.return_value = result
- options = FindReplaceTransformOptions.parse_obj(
+ patterns = [
{
- "patterns": [
- {
- "find": "00Y",
- "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
- },
- ]
- }
- )
+ "find": "00Y",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ },
+ ]
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('00Y');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
with pytest.raises(CumulusCIException):
- MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('00Y');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=context,
- transforms=[FindReplaceTransform(options)],
- )
+ create_builder(context, zip_content, patterns)
+
+
+def test_xpath_replace_id__bad_query_result():
+ context = mock.Mock()
+ result = {"totalSize": 0}
+ context.org_config.salesforce_client.query.return_value = result
+ patterns = [
+ {
+ "xpath": "/root/element1",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ },
+ ]
+ zip_content = {
+ Path("classes")
+ / "Foo.cls": "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ with pytest.raises(CumulusCIException):
+ create_builder(context, zip_content, patterns)
def test_find_replace_id__no_id_returned():
context = mock.Mock()
result = {"totalSize": 1, "records": [{"name": "foo"}]}
context.org_config.salesforce_client.query.return_value = result
- options = FindReplaceTransformOptions.parse_obj(
+ patterns = [
{
- "patterns": [
- {
- "find": "00Y",
- "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
- },
- ]
- }
- )
+ "find": "00Y",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ },
+ ]
+ zip_content = {
+ Path("classes") / "Foo.cls": "System.debug('00Y');",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
with pytest.raises(CumulusCIException):
- MetadataPackageZipBuilder.from_zipfile(
- ZipFileSpec(
- {
- Path("classes") / "Foo.cls": "System.debug('00Y');",
- Path("Bar.cls"): "System.debug('blah');",
- }
- ).as_zipfile(),
- context=context,
- transforms=[FindReplaceTransform(options)],
- )
+ create_builder(context, zip_content, patterns)
+
+
+def test_xpath_replace_id__no_id_returned():
+ context = mock.Mock()
+ result = {"totalSize": 1, "records": [{"name": "foo"}]}
+ context.org_config.salesforce_client.query.return_value = result
+ patterns = [
+ {
+ "xpath": "/root/element1",
+ "replace_record_id_query": "SELECT Id FROM Account WHERE name='Initech Corp.'",
+ },
+ ]
+ zip_content = {
+ Path("classes")
+ / "Foo.cls": "blahblah",
+ Path("Bar.cls"): "System.debug('blah');",
+ }
+ with pytest.raises(CumulusCIException):
+ create_builder(context, zip_content, patterns)
+
+
+def test_find_xpath_both_none(task_context):
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"replace": "ye"}]
+ with pytest.raises(ValidationError) as e:
+ create_builder(task_context, zip_content, patterns)
+ assert "Input is not valid. Please pass either find or xpath paramter." in str(e)
+
+
+def test_find_xpath_both_empty(task_context):
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "", "xpath": "", "replace": "ye"}]
+ with pytest.raises(ValidationError) as e:
+ create_builder(task_context, zip_content, patterns)
+ assert "Input is not valid. Please pass either find or xpath paramter." in str(e)
+
+
+def test_find_xpath_both(task_context):
+ zip_content = {
+ Path("Foo.cls"): "System.debug('blah');",
+ }
+ patterns = [{"find": "bl", "xpath": "bl", "replace": "ye"}]
+ with pytest.raises(ValidationError) as e:
+ create_builder(task_context, zip_content, patterns)
+ assert (
+ "Input is not valid. Please pass either find or xpath paramter not both."
+ in str(e)
+ )
+
+
+def test_invalid_xpath(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): "blahblah",
+ }
+ patterns = [{"xpath": '/root/element1[tezxt()=="blah"]', "replace": "yeah"}]
+ with pytest.raises(ET.XPathError):
+ create_builder(task_context, zip_content, patterns)
+
+
+def test_xpath_replace_with_index(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ patterns = [{"xpath": "/bookstore/book[2]/title", "replace": "New Title"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 New Title J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_with_text(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ patterns = [
+ {
+ "xpath": "/bookstore/book/title[text()='Learning XML']",
+ "replace": "Updated Text",
+ }
+ ]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Updated Text Erik T. Ray 2003 39.95 ',
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_with_exp_and_index(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ patterns = [
+ {"xpath": "/bookstore/book[price>40]/author[2]", "replace": "Rich Author"}
+ ]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Rich Author Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_with_exp_and_index2(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ patterns = [
+ {"xpath": "/bookstore/book[price<40]/author[1]", "replace": "Rich Author"}
+ ]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Rich Author 2005 30.00 Harry Potter Rich Author 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Rich Author 2003 39.95 ',
+ }
+ zip_assert(builder, modified_zip_content)
+
+
+def test_xpath_replace_with_exp(task_context):
+ zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start James McGovern Per Bothner Kurt Cagle James Linn Vaidyanathan Nagarajan 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ patterns = [{"xpath": "/bookstore/book[price>40]/author", "replace": "Rich Author"}]
+ builder = create_builder(task_context, zip_content, patterns)
+
+ modified_zip_content = {
+ Path(
+ "Foo.xml"
+ ): ' Everyday Italian Giada De Laurentiis 2005 30.00 Harry Potter J K. Rowling 2005 29.99 XQuery Kick Start Rich Author Rich Author Rich Author Rich Author Rich Author 2003 49.99 Learning XML Erik T. Ray 2003 39.95 ',
+ }
+ zip_assert(builder, modified_zip_content)
def test_source_transform_parsing():
diff --git a/cumulusci/core/source_transforms/transforms.py b/cumulusci/core/source_transforms/transforms.py
index 146802bb50..ab311b1456 100644
--- a/cumulusci/core/source_transforms/transforms.py
+++ b/cumulusci/core/source_transforms/transforms.py
@@ -2,12 +2,14 @@
import functools
import io
import os
+import re
import shutil
import typing as T
import zipfile
from pathlib import Path
from zipfile import ZipFile
+from lxml import etree as ET
from pydantic import BaseModel, root_validator
from cumulusci.core.dependencies.utils import TaskContext
@@ -288,9 +290,29 @@ def process(self, zf: ZipFile, context: TaskContext) -> ZipFile:
class FindReplaceBaseSpec(BaseModel, abc.ABC):
- find: str
+ find: T.Optional[str]
+ xpath: T.Optional[str]
paths: T.Optional[T.List[Path]] = None
+ @root_validator
+ def validate_find_xpath(cls, values):
+ findVal = values.get("find")
+ xpathVal = values.get("xpath")
+ if (findVal == "" or findVal is None) and (xpathVal is None or xpathVal == ""):
+ raise ValueError(
+ "Input is not valid. Please pass either find or xpath paramter."
+ )
+ if (
+ findVal != ""
+ and findVal is not None
+ and xpathVal != ""
+ and xpathVal is not None
+ ):
+ raise ValueError(
+ "Input is not valid. Please pass either find or xpath paramter not both."
+ )
+ return values
+
@abc.abstractmethod
def get_replace_string(self, context: TaskContext) -> str:
...
@@ -394,15 +416,71 @@ def __init__(self, options: FindReplaceTransformOptions):
self.options = options
def process(self, zf: ZipFile, context: TaskContext) -> ZipFile:
+ # To handle xpath with namespaces, without
+ def transform_xpath(expression):
+ predicate_pattern = re.compile(r"\[.*?\]")
+ parts = expression.split("/")
+ transformed_parts = []
+
+ for part in parts:
+ if part:
+ predicates = predicate_pattern.findall(part)
+ tag = predicate_pattern.sub("", part)
+ transformed_part = '/*[local-name()="' + tag + '"]'
+ for predicate in predicates:
+ transformed_part += predicate
+ transformed_parts.append(transformed_part)
+ transformed_expression = "".join(transformed_parts)
+
+ return transformed_expression
+
def process_file(filename: str, content: str) -> T.Tuple[str, str]:
path = Path(filename)
for spec in self.options.patterns:
if not spec.paths or any(
parent in path.parents for parent in spec.paths
):
- content = content.replace(
- spec.find, spec.get_replace_string(context)
- )
+ try:
+ # See if the content is an xml file
+ content_bytes = content.encode("utf-8")
+ root = ET.fromstring(content_bytes)
+
+ # See if content has an xml declaration
+ has_xml_declaration = content.strip().startswith("