Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gzipped storage #3148

Merged
merged 4 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package com.bakdata.conquery.commands;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -27,11 +34,13 @@
import jetbrains.exodus.env.StoreConfig;
import jetbrains.exodus.env.Transaction;
import kotlin.jvm.functions.Function4;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.jetbrains.annotations.NotNull;

/**
* Command allowing script based migration of databases. Especially useful for data that cannot be easily recreated after reimports, such as {@link com.bakdata.conquery.models.auth.entities.User}s and {@link com.bakdata.conquery.models.execution.ManagedExecution}s.
Expand All @@ -56,7 +65,6 @@
@Slf4j
public class MigrateCommand extends ConqueryCommand {


public MigrateCommand() {
super("migrate", "Run a migration script on a store.");
}
Expand All @@ -75,6 +83,18 @@ public void configure(Subparser subparser) {
.required(true)
.type(Arguments.fileType());

subparser
.addArgument("--in-gzip")
.help("If true, values are ungzipped before deserialization.")
.setDefault(true)
.type(Arguments.booleanType());

subparser
.addArgument("--out-gzip")
.help("If true, values are gzipped before writing.")
.setDefault(true)
.type(Arguments.booleanType());

subparser
.addArgument("--script")
.help("Migration Script returning a closure implementing MigrationScriptFactory. See supplementary example.groovy for details.\nSignature: String env, String store, String key, ObjectNode value -> return new Tuple(key,value)")
Expand All @@ -88,6 +108,10 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File inStoreDirectory = namespace.get("in");
final File outStoreDirectory = namespace.get("out");

final boolean inGzip = namespace.get("in_gzip");
final boolean outGzip = namespace.get("out_gzip");


final long logsize = ((XodusStoreFactory) configuration.getStorage()).getXodus().getLogFileSize().toKilobytes();


Expand All @@ -99,11 +123,11 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
}

// Create Groovy Shell and parse script
CompilerConfiguration config = new CompilerConfiguration();
final CompilerConfiguration config = new CompilerConfiguration();
config.setScriptBaseClass(MigrationScriptFactory.class.getName());
GroovyShell groovy = new GroovyShell(config);
final GroovyShell groovy = new GroovyShell(config);

MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());
final MigrationScriptFactory factory = (MigrationScriptFactory) groovy.parse(In.file((File) namespace.get("script")).readAll());

final Function4<String, String, JsonNode, JsonNode, Tuple> migrator = factory.run();

Expand All @@ -116,25 +140,12 @@ protected void run(io.dropwizard.setup.Environment environment, Namespace namesp
final File environmentDirectory = new File(outStoreDirectory, xenv.getName());
environmentDirectory.mkdirs();

processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper);
processEnvironment(xenv, logsize, environmentDirectory, migrator, mapper, inGzip, outGzip);
});

}


/**
* Class defining the interface for the Groovy-Script.
*/
public abstract static class MigrationScriptFactory extends Script {

/**
* Environment -> Store -> Key -> Value -> (Key, Value)
*/
@Override
public abstract Function4<String, String, JsonNode, JsonNode, Tuple> run();
}

private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper) {
private void processEnvironment(File inStoreDirectory, long logSize, File outStoreDirectory, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {
final jetbrains.exodus.env.Environment inEnvironment = Environments.newInstance(
inStoreDirectory,
new EnvironmentConfig().setLogFileSize(logSize)
Expand Down Expand Up @@ -175,7 +186,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
continue;
}

doMigrate(inStore, outStore, migrator, mapper);
migrateStore(inStore, outStore, migrator, mapper, inGzip, outGzip);
log.info("Done writing {}.", store);
}

Expand All @@ -191,7 +202,7 @@ private void processEnvironment(File inStoreDirectory, long logSize, File outSto
inEnvironment.close();
}

private void doMigrate(Store inStore, Store outStore, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper) {
private void migrateStore(Store inStore, Store outStore, Function4<String, String, JsonNode, JsonNode, Tuple> migrator, ObjectMapper mapper, boolean inGzip, boolean outGzip) {

final Environment inEnvironment = inStore.getEnvironment();
final Environment outEnvironment = outStore.getEnvironment();
Expand All @@ -211,13 +222,12 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
while (cursor.getNext()) {

// Everything is mapped with Smile so even the keys.
final JsonNode key = mapper.readTree(cursor.getKey().getBytesUnsafe());
final JsonNode key = read(mapper, cursor.getKey(), inGzip);

final JsonNode node = mapper.readTree(cursor.getValue().getBytesUnsafe());
final JsonNode value = read(mapper, cursor.getValue(), inGzip);

// Apply the migrator, it will return new key and value
final Tuple<?> migrated =
migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, node);
final Tuple<?> migrated = migrator.invoke(inEnvironment.getLocation(), inStore.getName(), key, value);

// => Effectively delete the object
if (migrated == null) {
Expand All @@ -226,18 +236,18 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
}

// Serialize the values and write them into new Store.
final ByteIterable keyIter = new ArrayByteIterable(mapper.writeValueAsBytes(migrated.get(0)));
final byte[] keyWritten = write(mapper, ((JsonNode) migrated.get(0)), outGzip);

final ByteIterable valueIter = new ArrayByteIterable(mapper.writeValueAsBytes(migrated.get(1)));
final byte[] valueWritten = write(mapper, ((JsonNode) migrated.get(1)), outGzip);

if (log.isTraceEnabled()) {
log.trace("Mapped `{}` to \n{}", new String(keyIter.getBytesUnsafe()), new String(valueIter.getBytesUnsafe()));
log.trace("Mapped `{}` to \n{}", new String(keyWritten), new String(valueWritten));
}
else {
log.debug("Mapped `{}`", new String(keyIter.getBytesUnsafe()));
else if (log.isDebugEnabled()) {
log.debug("Mapped `{}`", new String(keyWritten));
}

outStore.put(writeTx, keyIter, valueIter);
outStore.put(writeTx, new ArrayByteIterable(keyWritten), new ArrayByteIterable(valueWritten));

if (++processed % (1 + (count / 10)) == 0) {
log.info("Processed {} / {} ({}%)", processed, count, Math.round(100f * (float) processed / (float) count));
Expand All @@ -261,4 +271,37 @@ private void doMigrate(Store inStore, Store outStore, Function4<String, String,
}
}

private JsonNode read(ObjectMapper mapper, ByteIterable cursor, boolean gzip) throws IOException {
InputStream inputStream = new ByteArrayInputStream(cursor.getBytesUnsafe());

if (gzip) {
inputStream = new GZIPInputStream(inputStream);
}

return mapper.readTree(inputStream);
}

@SneakyThrows
@NotNull
private byte[] write(ObjectMapper mapper, JsonNode value, boolean gzip) throws JsonProcessingException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (OutputStream outputStream = gzip ? new GZIPOutputStream(baos) : baos) {
mapper.writeValue(outputStream, value);
}

return baos.toByteArray();
}

/**
* Class defining the interface for the Groovy-Script.
*/
public abstract static class MigrationScriptFactory extends Script {

/**
* Environment -> Store -> Key -> Value -> (Key, Value)
*/
@Override
public abstract Function4<String, String, JsonNode, JsonNode, Tuple> run();
}
}
Loading
Loading