Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka sql snapshotting #4665

Merged
merged 13 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ jobs:
- name: Run Integration Tests - migration - Kafkasql
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pmigration -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true

- name: Run Integration Tests - snapshotting - Kafkasql
run: ./mvnw -T 1.5C verify -am --no-transfer-progress -Pintegration-tests -Pkafkasql-snapshotting -Dregistry-kafkasql-image=ttl.sh/${{ github.sha }}/apicurio/apicurio-registry:1d -Premote-kafka -pl integration-tests -Dmaven.javadoc.skip=true

- name: Collect logs
if: failure()
run: sh ./.github/scripts/collect_logs.sh
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

env:
TESTCONTAINERS_RYUK_DISABLED: true

jobs:
build-verify:
name: Verify Application Build
Expand Down
1 change: 0 additions & 1 deletion app/.gitignore

This file was deleted.

8 changes: 2 additions & 6 deletions app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down Expand Up @@ -294,8 +290,8 @@
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>redpanda</artifactId>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void run() {

private void refresh() {
Instant now = Instant.now();
if (lastRefresh != null) {
if (lastRefresh != null && this.delegate != null && this.delegate.isReady()) {
List<DynamicConfigPropertyDto> staleConfigProperties = this.getStaleConfigProperties(lastRefresh);
if (!staleConfigProperties.isEmpty()) {
invalidateCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public List<ArtifactTypeInfo> listArtifactTypes() {

}

@Override
@Authorized(style=AuthorizedStyle.None, level=AuthorizedLevel.Admin)
public void triggerSnapshot() {
storage.triggerSnapshotCreation();
}

/**
* @see io.apicurio.registry.rest.v3.AdminResource#listGlobalRules()
*/
Expand Down
50 changes: 18 additions & 32 deletions app/src/main/java/io/apicurio/registry/storage/RegistryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,8 @@
import io.apicurio.registry.model.GA;
import io.apicurio.registry.model.GAV;
import io.apicurio.registry.model.VersionId;
import io.apicurio.registry.storage.dto.ArtifactMetaDataDto;
import io.apicurio.registry.storage.dto.ArtifactReferenceDto;
import io.apicurio.registry.storage.dto.ArtifactSearchResultsDto;
import io.apicurio.registry.storage.dto.ArtifactVersionMetaDataDto;
import io.apicurio.registry.storage.dto.CommentDto;
import io.apicurio.registry.storage.dto.ContentWrapperDto;
import io.apicurio.registry.storage.dto.DownloadContextDto;
import io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto;
import io.apicurio.registry.storage.dto.EditableGroupMetaDataDto;
import io.apicurio.registry.storage.dto.EditableVersionMetaDataDto;
import io.apicurio.registry.storage.dto.GroupMetaDataDto;
import io.apicurio.registry.storage.dto.GroupSearchResultsDto;
import io.apicurio.registry.storage.dto.OrderBy;
import io.apicurio.registry.storage.dto.OrderDirection;
import io.apicurio.registry.storage.dto.RoleMappingDto;
import io.apicurio.registry.storage.dto.RoleMappingSearchResultsDto;
import io.apicurio.registry.storage.dto.RuleConfigurationDto;
import io.apicurio.registry.storage.dto.SearchFilter;
import io.apicurio.registry.storage.dto.StoredArtifactVersionDto;
import io.apicurio.registry.storage.dto.VersionSearchResultsDto;
import io.apicurio.registry.storage.error.ArtifactAlreadyExistsException;
import io.apicurio.registry.storage.error.ArtifactNotFoundException;
import io.apicurio.registry.storage.error.ContentNotFoundException;
import io.apicurio.registry.storage.error.GroupAlreadyExistsException;
import io.apicurio.registry.storage.error.GroupNotFoundException;
import io.apicurio.registry.storage.error.RegistryStorageException;
import io.apicurio.registry.storage.error.RuleAlreadyExistsException;
import io.apicurio.registry.storage.error.RuleNotFoundException;
import io.apicurio.registry.storage.error.VersionAlreadyExistsException;
import io.apicurio.registry.storage.error.VersionNotFoundException;
import io.apicurio.registry.storage.dto.*;
import io.apicurio.registry.storage.error.*;
import io.apicurio.registry.storage.impexp.EntityInputStream;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.utils.impexp.ArtifactBranchEntity;
Expand Down Expand Up @@ -398,7 +370,7 @@ void updateArtifactRule(String groupId, String artifactId, RuleType rule, RuleCo
/**
* Gets the stored meta-data for a single version of an artifact. This will return all meta-data for the
* version, including any user edited meta-data along with anything generated by the artifactStore.
*
*
* @param globalId
* @throws VersionNotFoundException
* @throws RegistryStorageException
Expand Down Expand Up @@ -580,7 +552,7 @@ void updateArtifactRule(String groupId, String artifactId, RuleType rule, RuleCo
* @param limit the result size limit
*/
RoleMappingSearchResultsDto searchRoleMappings(int offset, int limit) throws RegistryStorageException;

/**
* Gets the details of a single role mapping.
*
Expand Down Expand Up @@ -864,6 +836,20 @@ void updateArtifactRule(String groupId, String artifactId, RuleType rule, RuleCo
*/
void deleteArtifactBranch(GA ga, BranchId branchId);

/**
* Triggers a snapshot creation of the internal database.
*
* @throws RegistryStorageException
*/
String triggerSnapshotCreation() throws RegistryStorageException;

/**
* Creates the snapshot of the internal database based on configuration.
*
* @param snapshotLocation
* @throws RegistryStorageException
*/
String createSnapshot(String snapshotLocation) throws RegistryStorageException;

enum ArtifactRetrievalBehavior {
DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,16 @@ public void deleteArtifactBranch(GA ga, BranchId branchId) {
checkReadOnly();
delegate.deleteArtifactBranch(ga, branchId);
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
checkReadOnly();
return delegate.triggerSnapshotCreation();
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
checkReadOnly();
return delegate.createSnapshot(snapshotLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,14 @@ public void createOrReplaceArtifactBranch(GA ga, BranchId branchId, List<Version
public void deleteArtifactBranch(GA ga, BranchId branchId) {
delegate.deleteArtifactBranch(ga, branchId);
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
return delegate.triggerSnapshotCreation();
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
return delegate.createSnapshot(snapshotLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -498,4 +498,14 @@ public GAV getArtifactBranchTip(GA ga, BranchId branchId, ArtifactRetrievalBehav
public List<GAV> getArtifactBranch(GA ga, BranchId branchId, ArtifactRetrievalBehavior behavior) {
return proxy(storage -> storage.getArtifactBranch(ga, branchId, behavior));
}

@Override
public String triggerSnapshotCreation() throws RegistryStorageException {
return proxy((RegistryStorage::triggerSnapshotCreation));
}

@Override
public String createSnapshot(String snapshotLocation) throws RegistryStorageException {
return proxy((storage -> storage.createSnapshot(snapshotLocation)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import java.util.Properties;

public interface KafkaSqlConfiguration {

String bootstrapServers();
String topic();
String snapshotsTopic();
String snapshotEvery();
String snapshotLocation();
Properties topicProperties();
boolean isTopicAutoCreate();
Integer pollTimeout();
Integer responseTimeout();
Properties producerProperties();
Properties consumerProperties();
Properties adminProperties();

}
Loading
Loading