Skip to content

Commit

Permalink
Updating with proposed changes from Composite Timestamp PR
Browse files Browse the repository at this point in the history
  • Loading branch information
mineralntl committed Apr 24, 2024
1 parent 3011367 commit 7b80494
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datawave.ingest.mapreduce.job;

import static datawave.ingest.table.config.AbstractTableConfigHelper.DISABLE_VERSIONING_ITERATOR;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -27,8 +29,10 @@
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.constraints.DefaultKeySizeConstraint;
import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -259,7 +263,14 @@ protected void createAndConfigureTablesIfNecessary(Set<String> tableNames, Table
// If the tables don't exist, then create them.
try {
if (!tops.exists(table)) {
tops.create(table);
boolean disableVersioning = conf != null && conf.getBoolean(table + DISABLE_VERSIONING_ITERATOR, false);
if (disableVersioning) {
tops.create(table, new NewTableConfiguration().withoutDefaultIterators());
// withoutDefaultIterators will also skip the default table constraint, so set that
tops.setProperty(table, Property.TABLE_CONSTRAINT_PREFIX + "1", DefaultKeySizeConstraint.class.getName());
} else {
tops.create(table);
}
}
} catch (TableExistsException te) {
// in this case, somebody else must have created the table after our existence check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public abstract class AbstractTableConfigHelper implements TableConfigHelper {

private static final String DISABLE_VERSIONING_ITERATOR = ".disable.versioning.iterator";
public static final String DISABLE_VERSIONING_ITERATOR = ".disable.versioning.iterator";
protected Configuration config;

protected AbstractTableConfigHelper() {}
Expand Down Expand Up @@ -92,9 +92,9 @@ public static void setPropertyIfNecessary(String tableName, String propertyName,
* @throws TableNotFoundException
* if the table is not found
*/
protected void setAggregatorConfigurationIfNecessary(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Logger log)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (areAggregatorsConfigured(tableName, aggregators, tops)) {
protected void setAggregatorConfigurationIfNecessary(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config,
Logger log) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (areAggregatorsConfigured(tableName, aggregators, tops, config)) {
log.debug(tableName + " appears to have its aggregators configured already.");
return;
}
Expand Down Expand Up @@ -148,7 +148,8 @@ public static Map<String,String> generateInitialTableProperties(Configuration co
* @throws TableNotFoundException
* if the table is not found
*/
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops) throws TableNotFoundException {
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config)
throws TableNotFoundException {
boolean aggregatorsConfigured = false;
Map<String,String> props = generateInitialTableProperties(config, tableName);
props.putAll(generateAggTableProperties(aggregators));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ protected void configureShardTable(TableOperations tops) throws AccumuloExceptio
CombinerConfiguration tfConf = new CombinerConfiguration(new Column("tf"),
new IteratorSetting(10, "TF", datawave.ingest.table.aggregator.TextIndexAggregator.class.getName()));

setAggregatorConfigurationIfNecessary(tableName, Collections.singletonList(tfConf), tops, log);
setAggregatorConfigurationIfNecessary(tableName, Collections.singletonList(tfConf), tops, conf, log);

if (markingsSetupIteratorEnabled) {
for (IteratorScope scope : IteratorScope.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun
areAggregatorsConfiguredCalled = false;
AbstractTableConfigHelperTest.GET_PROPERTIES_THROWS_ACCUMULO_EXCEPTION = false;

boolean results = this.areAggregatorsConfigured(tableName, aggregators, tops);
boolean results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertFalse("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -277,7 +277,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun
parent.tableProperties.put(key, value);
}

results = this.areAggregatorsConfigured(tableName, aggregators, tops);
results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertFalse("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -288,7 +288,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

parent.tableProperties.putAll(props);

results = this.areAggregatorsConfigured(tableName, aggregators, tops);
results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertTrue("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -301,7 +301,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

tableName = AbstractTableConfigHelperTest.BAD_TABLE_NAME;

this.areAggregatorsConfigured(tableName, aggregators, tops);
this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.fail("AreAggregratorsConfigured failed to throw the expected exception.");

Expand All @@ -322,7 +322,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

tableName = AbstractTableConfigHelperTest.TABLE_NAME;

this.areAggregatorsConfigured(tableName, aggregators, tops);
this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.fail("AreAggregratorsConfigured failed to throw the expected exception.");

Expand Down Expand Up @@ -357,7 +357,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
resultsForOverridenAreAggregatorsConfigured = true;
areAggregatorsConfiguredCalled = false;

this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, log);
this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, config, log);

Assert.assertTrue("SetCombinerConfigurationIfNecessary() failed to call areAggregatorsConfigured", areAggregatorsConfiguredCalled);
Assert.assertEquals("SetCombinerConfigurationIfNecessary() failed to generate the expected number of debug messages.", 1, debugMessages.size());
Expand All @@ -368,7 +368,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
resultsForOverridenAreAggregatorsConfigured = false;
areAggregatorsConfiguredCalled = false;

this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, log);
this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, config, log);

Assert.assertTrue("SetCombinerConfigurationIfNecessary() failed to call areAggregatorsConfigured", areAggregatorsConfiguredCalled);
Assert.assertEquals("SetCombinerConfigurationIfNecessary() failed to generate the expected number of debug messages.", 0, debugMessages.size());
Expand Down Expand Up @@ -411,7 +411,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
public boolean resultsForSetLocalityGroupsConfigured = false;

@Override
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops)
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config)
throws TableNotFoundException {

boolean results = false;
Expand All @@ -423,7 +423,7 @@ protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfig
results = resultsForOverridenAreAggregatorsConfigured;
} else {

results = super.areAggregatorsConfigured(tableName, aggregators, tops);
results = super.areAggregatorsConfigured(tableName, aggregators, tops, config);
}

return results;
Expand Down

0 comments on commit 7b80494

Please sign in to comment.