Skip to content

Commit

Permalink
Merge pull request #3185 from ingef/feature/parallelize-dict-handling…
Browse files Browse the repository at this point in the history
…-ImportJob

apply all Dictionary operations in ImportJob with Stream.parallel
  • Loading branch information
awildturtok authored Oct 5, 2023
2 parents 6289a75 + c116d3a commit 253780d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.bakdata.conquery.models.dictionary;


import java.util.stream.IntStream;

import com.bakdata.conquery.models.events.stores.root.IntegerStore;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
Expand Down Expand Up @@ -40,17 +42,17 @@ public static DictionaryMapping createAndImport(Dictionary from, Dictionary into

int newIds = 0;

Int2IntMap source2Target = new Int2IntOpenHashMap(from.size());
final Int2IntMap source2Target = new Int2IntOpenHashMap(from.size());

source2Target.defaultReturnValue(-1);

Int2IntMap target2Source = new Int2IntOpenHashMap(from.size());
final Int2IntMap target2Source = new Int2IntOpenHashMap(from.size());

target2Source.defaultReturnValue(-1);

for (int id = 0; id < from.size(); id++) {

byte[] value = from.getElement(id);
final byte[] value = from.getElement(id);
int targetId = into.getId(value);

//if id was unknown until now
Expand Down Expand Up @@ -92,22 +94,21 @@ public IntCollection target() {
* Mutably applies mapping to store.
*/
public void applyToStore(StringStore from, IntegerStore to) {
for (int event = 0; event < from.getLines(); event++) {
if (!from.has(event)) {
to.setNull(event);
continue;
}

final int string = from.getString(event);

int value = source2Target(string);

if (value == -1) {
throw new IllegalStateException(String.format("Missing mapping for %s", string));
}

to.setInteger(event, value);
}
IntStream.range(0, from.getLines())
.parallel()
.forEach(event -> {
if (!from.has(event)) {
to.setNull(event);
return;
}
final int string = from.getString(event);
final int value = source2Target(string);

if (value == -1) {
throw new IllegalStateException(String.format("Missing mapping for %s", string));
}
to.setInteger(event, value);
});
}

}
123 changes: 63 additions & 60 deletions backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.ws.rs.BadRequestException;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class ImportJob extends Job {
private final PreprocessedData container;
private final ConqueryConfig config;

private final IdMutex<DictionaryId> sharedDictionaryLocks;


private static final int NUMBER_OF_STEPS = /* directly in execute = */4;

Expand Down Expand Up @@ -148,7 +151,8 @@ else if (processedImport != null) {
header,
dictionaries,
container,
config
config,
sharedDictionaryLocks
);
}
}
Expand Down Expand Up @@ -197,63 +201,65 @@ private static Map<DictionaryId, Dictionary> createLocalIdReplacements(Map<Strin
* Create mappings for shared dictionaries dict.
* This is not synchronized because the methods is called within the job execution.
*/
private static Map<String, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table) {
private static Map<String, DictionaryMapping> importDictionaries(DistributedNamespace namespace, Map<String, Dictionary> dicts, Column[] columns, String importName, Table table, IdMutex<DictionaryId> sharedDictionaryLocks) {

// Empty Maps are Coalesced to null by Jackson
if (dicts == null) {
return Collections.emptyMap();
}

final Map<String, DictionaryMapping> out = new HashMap<>();
final Map<String, DictionaryMapping> out = new ConcurrentHashMap<>();

log.debug("BEGIN importing {} Dictionaries", dicts.size());

for (Column column : columns) {
// Might not have an underlying Dictionary (eg Singleton, direct-Number)
// but could also be an error :/ Most likely the former
// It's a shared dictionary
// This should never fail, becaus the dictionary is pre-created in the replacement generation step

if (column.getType() != MajorTypeId.STRING) {
continue;
}
Arrays.stream(columns)
.parallel()
.filter(column -> column.getType() == MajorTypeId.STRING)
.filter(col -> col.getSharedDictionary() == null)
.map(col -> dicts.get(col.getName()))
.filter(Objects::nonNull)
.forEach(dictionary -> {
// Normal Dictionary -> no merge necessary, just distribute
distributeDictionary(namespace, dictionary);
});

// Might not have an underlying Dictionary (eg Singleton, direct-Number)
// but could also be an error :/ Most likely the former
final Dictionary importDictionary = dicts.get(column.getName());
Arrays.stream(columns)
.parallel()
.filter(column -> column.getType() == MajorTypeId.STRING)
.filter(col -> col.getSharedDictionary() != null)
.filter(col -> dicts.containsKey(col.getName()))
.forEach(column -> {
final Dictionary importDictionary = dicts.get(column.getName());

if (importDictionary == null) {
log.trace("No Dictionary for {}", column);
continue;
}
final String sharedDictionaryName = column.getSharedDictionary();
log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, column.getName(), sharedDictionaryName);

final DictionaryId dictionaryId = new DictionaryId(namespace.getDataset().getId(), sharedDictionaryName);
final DictionaryMapping mapping;

if (column.getSharedDictionary() == null) {
// Normal Dictionary -> no merge necessary, just distribute
distributeDictionary(namespace, importDictionary);
}
else {
// It's a shared dictionary

final String sharedDictionaryName = column.getSharedDictionary();
// We have to lock here, as sibling columns might both use the same shared-dictionary
try (IdMutex.Locked lock = sharedDictionaryLocks.acquire(dictionaryId)) {
final Dictionary sharedDictionary = namespace.getStorage().getDictionary(dictionaryId);

log.debug("Column[{}.{}.{}] part of shared Dictionary[{}]", table.getId(), importName, column.getName(), sharedDictionaryName);
ResourceUtil.throwNotFoundIfNull(dictionaryId, sharedDictionary);
log.trace("Merging into shared Dictionary[{}]", sharedDictionary);

final DictionaryId dictionaryId = new DictionaryId(namespace.getDataset().getId(), sharedDictionaryName);
final Dictionary sharedDictionary = namespace.getStorage().getDictionary(dictionaryId);
mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);
}

// This should never fail, becaus the dictionary is pre-created in the replacement generation step
ResourceUtil.throwNotFoundIfNull(dictionaryId, sharedDictionary);
if (mapping.getNumberOfNewIds() != 0) {
distributeDictionary(namespace, mapping.getTargetDictionary());
}
out.put(column.getName(), mapping);
});

log.trace("Merging into shared Dictionary[{}]", sharedDictionary);


DictionaryMapping mapping = DictionaryMapping.createAndImport(importDictionary, sharedDictionary);

if (mapping.getNumberOfNewIds() != 0) {
distributeDictionary(namespace, mapping.getTargetDictionary());
}

out.put(column.getName(), mapping);
}
}

return out;
}

Expand Down Expand Up @@ -285,7 +291,7 @@ public void execute() throws JSONException, InterruptedException, IOException {
log.info("Importing Dictionaries");

Map<String, DictionaryMapping> sharedDictionaryMappings =
importDictionaries(namespace, dictionaries.getDictionaries(), table.getColumns(), header.getName(), table);
importDictionaries(namespace, dictionaries.getDictionaries(), table.getColumns(), header.getName(), table, sharedDictionaryLocks);

log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values());

Expand Down Expand Up @@ -482,33 +488,30 @@ private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Ma

final ProgressReporter subJob = getProgressReporter().subJob(mappings.size());

for (Map.Entry<String, DictionaryMapping> entry : mappings.entrySet()) {
final String columnName = entry.getKey();
final DictionaryMapping mapping = entry.getValue();
// we need to find a new Type for the index-Column as it's going to be remapped and might change in size
mappings.entrySet().parallelStream()
.forEach(entry -> {
final String columnName = entry.getKey();
final DictionaryMapping mapping = entry.getValue();

final StringStore stringStore = (StringStore) values.get(columnName);
final StringStore stringStore = (StringStore) values.get(columnName);
log.debug("Remapping Column[{}] = {} with {}", columnName, stringStore, mapping);
final IntegerParser indexParser = new IntegerParser(config);
final IntSummaryStatistics statistics = mapping.target().intStream().summaryStatistics();

log.debug("Remapping Column[{}] = {} with {}", columnName, stringStore, mapping);
indexParser.setLines(stringStore.getLines());
indexParser.setMinValue(statistics.getMin());
indexParser.setMaxValue(statistics.getMax());

// we need to find a new Type for the index-Column as it's going to be remapped and might change in size
final IntegerParser indexParser = new IntegerParser(config);
final IntegerStore newType = indexParser.findBestType();

final IntSummaryStatistics statistics = mapping.target().intStream().summaryStatistics();
log.trace("Decided for {}", newType);

indexParser.setLines(stringStore.getLines());
mapping.applyToStore(stringStore, newType);
stringStore.setIndexStore(newType);

indexParser.setMinValue(statistics.getMin());
indexParser.setMaxValue(statistics.getMax());

final IntegerStore newType = indexParser.findBestType();

log.trace("Decided for {}", newType);

mapping.applyToStore(stringStore, newType);

stringStore.setIndexStore(newType);
subJob.report(1);
}
subJob.report(1);
});
}

private Import createImport(PreprocessedHeader header, Map<String, ColumnStore> stores, Column[] columns, int size) {
Expand Down

0 comments on commit 253780d

Please sign in to comment.