Skip to content

Commit

Permalink
Handle change streams (#69)
Browse files Browse the repository at this point in the history
Does not cope with changing change streams while underlying columns are dropped.
  • Loading branch information
nielm authored Jan 29, 2024
1 parent e3953b9 commit 6c05a66
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 222 deletions.
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@
<spotless.version>2.43.0</spotless.version>
<exec-maven.version>3.1.1</exec-maven.version>
<build-helper-maven-plugin.version>3.5.0</build-helper-maven-plugin.version>
<auto-value.version>1.10.4</auto-value.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>${auto-value.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -101,6 +107,19 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${auto-value.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
441 changes: 241 additions & 200 deletions src/main/java/com/google/cloud/solutions/spannerddl/diff/DdlDiff.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.solutions.spannerddl.parser;

import com.google.cloud.solutions.spannerddl.diff.ASTTreeUtils;

public class ASTchange_stream_for_clause extends SimpleNode {
public ASTchange_stream_for_clause(int id) {
super(id);
}

public ASTchange_stream_for_clause(DdlParser p, int id) {
super(p, id);
}

@Override
public String toString() {
ASTchange_stream_tracked_tables tables =
ASTTreeUtils.getOptionalChildByType(children, ASTchange_stream_tracked_tables.class);
if (tables != null) {
return "FOR " + ASTTreeUtils.tokensToString(tables, false);
}
return "FOR ALL";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,44 @@
*/
package com.google.cloud.solutions.spannerddl.parser;

import com.google.cloud.solutions.spannerddl.diff.ASTTreeUtils;
import com.google.common.base.Joiner;

/**
* @link
* https://cloud.google.com/spanner/docs/reference/standard-sql/data-definition-language#create-change-stream
*/
public class ASTcreate_change_stream_statement extends SimpleNode {
public ASTcreate_change_stream_statement(int id) {

super(id);
throw new UnsupportedOperationException("Not Implemented");
}

public ASTcreate_change_stream_statement(DdlParser p, int id) {
super(p, id);
throw new UnsupportedOperationException("Not Implemented");
}

public String getName() {
return ASTTreeUtils.getChildByType(children, ASTname.class).toString();
}

public ASTchange_stream_for_clause getForClause() {
return ASTTreeUtils.getOptionalChildByType(children, ASTchange_stream_for_clause.class);
}

public ASToptions_clause getOptionsClause() {
return ASTTreeUtils.getOptionalChildByType(children, ASToptions_clause.class);
}

@Override
public String toString() {
return Joiner.on(" ")
.skipNulls()
.join("CREATE CHANGE STREAM", getName(), getForClause(), getOptionsClause());
}

@Override
public boolean equals(Object other) {
return (other instanceof ASTcreate_change_stream_statement)
&& this.toString().equals(other.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void compareDddTextFiles() throws IOException {
// build an expectedResults without any column or table drops.
List<String> expectedDiffNoDrops =
expectedDiff.stream()
.filter(statement -> !statement.matches(".*DROP (TABLE|COLUMN).*"))
.filter(statement -> !statement.matches(".*DROP (TABLE|COLUMN|CHANGE STREAM).*"))
.collect(Collectors.toCollection(LinkedList::new));

// remove any drop indexes from the expectedResults if they do not have an equivalent
Expand Down Expand Up @@ -109,9 +109,11 @@ public void compareDddTextFiles() throws IOException {
.isEqualTo(expectedDiffNoDrops);
}
} catch (DdlDiffException e) {
fail("DdlDiffException when processing segment " + segmentName + ": " + e);
fail("DdlDiffException when processing segment:\n'" + segmentName + "''\n" + e.getMessage());
} catch (Exception e) {
throw new Error("Unexpected exception when processing segment " + segmentName + ": " + e, e);
throw new Error(
"Unexpected exception when processing segment \n'" + segmentName + "'\n" + e.getMessage(),
e);
}

if (originalSegmentIt.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public void parseAlterDatabaseDifferentDbNames() throws ParseException {

getDiffCheckDdlDiffException(
"ALTER DATABASE dbname SET OPTIONS(hello='world');"
+ "ALTER DATABASE otherdbname SET OPTIONS(hello='world');",
+ "ALTER DATABASE otherdbname SET OPTIONS(goodbye='world');",
"",
true,
"Multiple database IDs defined");
Expand Down
12 changes: 2 additions & 10 deletions src/test/resources/ddlParserUnsupported.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ CREATE OR REPLACE VIEW test1 SQL SECURITY INVOKER AS SELECT * from test2

CREATE VIEW test1 SQL SECURITY INVOKER AS SELECT * from test2

== Test 5

Create change stream test1 for test2

== Test 6

drop change stream test1
Expand All @@ -48,15 +44,11 @@ drop index test1

drop table test1

== Test 9 // TODO Change streams

CREATE CHANGE STREAM change_stream_name FOR ALL

== Test 9a - alter change stream not supported
== Test 9 - alter change stream not supported

ALTER CHANGE STREAM change_stream_name DROP FOR ALL

== Test 9b - drop change stream not supported
== Test 9a - drop change stream not supported

DROP CHANGE STREAM change_stream_name

Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/ddlParserValidation.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,12 @@ CREATE TABLE test_table (

CREATE TABLE test_table ( intcol INT64 NOT NULL HIDDEN ) PRIMARY KEY (intcol ASC)

== Test 12 change stream for all

CREATE CHANGE STREAM change_stream_name FOR ALL OPTIONS (retention_period='1d',value_capture_type='OLD_AND_NEW_VALUES')

== Test 12b change stream for certain cols

CREATE CHANGE STREAM change_stream_name FOR table1, table2 ( ), table3 ( col1, col2 ) OPTIONS (retention_period='7d',value_capture_type='NEW_ROW')

==
20 changes: 16 additions & 4 deletions src/test/resources/expectedDdlDiff.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ ALTER TABLE test1 DROP CONSTRAINT fk_in_alter

DROP INDEX index2
DROP INDEX index1
ALTER TABLE test2 DROP CONSTRAINT ch_in_test2
ALTER TABLE test2 DROP CONSTRAINT fk_in_test2
ALTER TABLE test1 DROP CONSTRAINT ch_in_test1
ALTER TABLE test2 DROP CONSTRAINT ch_in_test2
ALTER TABLE test1 DROP CONSTRAINT fk_in_test1
ALTER TABLE test1 DROP CONSTRAINT ch_in_test1
DROP TABLE test2
ALTER TABLE test1 ADD COLUMN col3 INT64
CREATE TABLE test3 ( col1 INT64 ) PRIMARY KEY (col1 ASC)
CREATE INDEX index1 ON test1 ( col3 ASC )
ALTER TABLE test3 ADD CONSTRAINT ch_in_test3 CHECK (col1 = col3 and col1 > 100 and col2 < -50)
ALTER TABLE test3 ADD CONSTRAINT fk_in_test3 FOREIGN KEY ( col3 ) REFERENCES othertable ( othercol ) ON DELETE NO ACTION
ALTER TABLE test1 ADD CONSTRAINT ch_in_test1 CHECK (col1 = col3 and col1 > 100 and col2 < -50)
ALTER TABLE test3 ADD CONSTRAINT ch_in_test3 CHECK (col1 = col3 and col1 > 100 and col2 < -50)
ALTER TABLE test1 ADD CONSTRAINT fk_in_test1 FOREIGN KEY ( col3 ) REFERENCES othertable ( othercol ) ON DELETE NO ACTION
ALTER TABLE test1 ADD CONSTRAINT ch_in_test1 CHECK (col1 = col3 and col1 > 100 and col2 < -50)

== TEST 16 add check constraint via alter statement

Expand Down Expand Up @@ -252,5 +252,17 @@ ALTER TABLE test1 ADD CONSTRAINT fk_in_table FOREIGN KEY ( col2 ) REFERENCES oth
DROP INDEX test4
CREATE INDEX test4 ON test1 ( col1 ASC ) STORING ( col2 )

== TEST 48 change streams create modify delete in correct order wrt tables

DROP CHANGE STREAM toBeDeleted
DROP TABLE myToBeDeletedTable
CREATE TABLE myCreatedTable ( mycol INT64 ) PRIMARY KEY (mycol ASC)
CREATE CHANGE STREAM toCreate FOR mytable4 OPTIONS (retention_period='36h')
CREATE CHANGE STREAM toCreateAll FOR ALL
ALTER CHANGE STREAM toBeChanged SET FOR myTable2 ( col1, col3, col4 ), mytable3 ( )
ALTER CHANGE STREAM toBeChanged SET OPTIONS (retention_period='48h')
ALTER CHANGE STREAM toBeChangedOnlyTable SET FOR myTable1, myTable2 ( col1 )
ALTER CHANGE STREAM toBeChangedOnlyOptions SET OPTIONS (retention_period='48h')

==

11 changes: 10 additions & 1 deletion src/test/resources/newDdl.txt
Original file line number Diff line number Diff line change
Expand Up @@ -427,5 +427,14 @@ primary key (col1);

Create index IF NOT EXISTS test4 on test1 ( col1 ) STORING ( col2 )

==
== TEST 48 change streams create modify delete in correct order wrt tables

Create table myCreatedTable (mycol int64) primary key (mycol);
create change stream toremain for all options (retention_period = '36h');
create change stream toBeChanged for myTable2 ( col1, col3, col4), mytable3 () options (retention_period = '48h');
create change stream toCreate for mytable4 options (retention_period = '36h');
create change stream toCreateAll for all;
create change stream toBeChangedOnlyTable for myTable1, myTable2 ( col1) options (retention_period = '36h');
create change stream toBeChangedOnlyOptions for myTable1, myTable2 ( col1, col3) options (retention_period = '48h');

==
13 changes: 13 additions & 0 deletions src/test/resources/originalDdl.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,17 @@ create table test1 (
constraint ch_in_test1 check (col1=col2 and col1 > 100 and col2 < -50)
)
primary key (col1);

create table test2 (
col1 int64,
col2 int64 NOT NULL,
constraint fk_in_test2 foreign key (col2) references othertable(othercol),
constraint ch_in_test2 check (col1=col2 and col1 > 100 and col2 < -50)
)
primary key (col1);

create index index1 on test1 (col1);

create index index2 on test2 (col1);


Expand Down Expand Up @@ -423,5 +426,15 @@ primary key (col1);

Create index IF NOT EXISTS test4 on test1 ( col1 )

== TEST 48 change streams create modify delete in correct order wrt tables

Create table myToBeDeletedTable (mycol int64) primary key (mycol);
create change stream toremain for all options (retention_period = '36h');
create change stream toBeDeleted for myTable options (retention_period = '36h');
create change stream toBeChanged for myTable1, myTable2 ( col1, col3) options (retention_period = '36h');
create change stream toBeChangedOnlyTable for myTable1, myTable2 ( col1, col3) options (retention_period = '36h');
create change stream toBeChangedOnlyOptions for myTable1, myTable2 ( col1, col3) options (retention_period = '36h');

==


0 comments on commit 6c05a66

Please sign in to comment.