Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for TableConfig changes #2356

Merged
merged 9 commits into from
Jul 25, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datawave.ingest.mapreduce.job;

import static datawave.ingest.table.config.AbstractTableConfigHelper.DISABLE_VERSIONING_ITERATOR;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -27,8 +29,10 @@
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.constraints.DefaultKeySizeConstraint;
import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -259,7 +263,14 @@ protected void createAndConfigureTablesIfNecessary(Set<String> tableNames, Table
// If the tables don't exist, then create them.
try {
if (!tops.exists(table)) {
tops.create(table);
boolean disableVersioning = conf != null && conf.getBoolean(table + DISABLE_VERSIONING_ITERATOR, false);
if (disableVersioning) {
tops.create(table, new NewTableConfiguration().withoutDefaultIterators());
// withoutDefaultIterators will also skip the default table constraint, so set that
tops.setProperty(table, Property.TABLE_CONSTRAINT_PREFIX + "1", DefaultKeySizeConstraint.class.getName());
} else {
tops.create(table);
}
}
} catch (TableExistsException te) {
// in this case, somebody else must have created the table after our existence check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public abstract class AbstractTableConfigHelper implements TableConfigHelper {

private static final String DISABLE_VERSIONING_ITERATOR = ".disable.versioning.iterator";
public static final String DISABLE_VERSIONING_ITERATOR = ".disable.versioning.iterator";
protected Configuration config;

protected AbstractTableConfigHelper() {}
Expand Down Expand Up @@ -92,9 +92,9 @@ public static void setPropertyIfNecessary(String tableName, String propertyName,
* @throws TableNotFoundException
* if the table is not found
*/
protected void setAggregatorConfigurationIfNecessary(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Logger log)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (areAggregatorsConfigured(tableName, aggregators, tops)) {
protected void setAggregatorConfigurationIfNecessary(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config,
Logger log) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (areAggregatorsConfigured(tableName, aggregators, tops, config)) {
log.debug(tableName + " appears to have its aggregators configured already.");
return;
}
Expand Down Expand Up @@ -148,7 +148,8 @@ public static Map<String,String> generateInitialTableProperties(Configuration co
* @throws TableNotFoundException
* if the table is not found
*/
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops) throws TableNotFoundException {
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config)
throws TableNotFoundException {
boolean aggregatorsConfigured = false;
Map<String,String> props = generateInitialTableProperties(config, tableName);
props.putAll(generateAggTableProperties(aggregators));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void configureShardTable(TableOperations tops) throws AccumuloExceptio
// Set a text index aggregator on the "tf" (Term Frequency) column family
CombinerConfiguration tfConf = new CombinerConfiguration(new Column("tf"),
new IteratorSetting(10, "TF", datawave.ingest.table.aggregator.TextIndexAggregator.class.getName()));
setAggregatorConfigurationIfNecessary(tableName, Collections.singletonList(tfConf), tops, log);
setAggregatorConfigurationIfNecessary(tableName, Collections.singletonList(tfConf), tops, conf, log);

if (markingsSetupIteratorEnabled) {
for (IteratorScope scope : IteratorScope.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun
areAggregatorsConfiguredCalled = false;
AbstractTableConfigHelperTest.GET_PROPERTIES_THROWS_ACCUMULO_EXCEPTION = false;

boolean results = this.areAggregatorsConfigured(tableName, aggregators, tops);
boolean results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertFalse("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -277,7 +277,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun
parent.tableProperties.put(key, value);
}

results = this.areAggregatorsConfigured(tableName, aggregators, tops);
results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertFalse("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -288,7 +288,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

parent.tableProperties.putAll(props);

results = this.areAggregatorsConfigured(tableName, aggregators, tops);
results = this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.assertTrue("AreAggregatorsConfigured called", areAggregatorsConfiguredCalled);
Assert.assertTrue("AreAggregatorsConfigured returned and unexpected results", results);
Expand All @@ -301,7 +301,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

tableName = AbstractTableConfigHelperTest.BAD_TABLE_NAME;

this.areAggregatorsConfigured(tableName, aggregators, tops);
this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.fail("AreAggregratorsConfigured failed to throw the expected exception.");

Expand All @@ -322,7 +322,7 @@ public void exposeAreAggregatorsConfigured() throws AssertionError, TableNotFoun

tableName = AbstractTableConfigHelperTest.TABLE_NAME;

this.areAggregatorsConfigured(tableName, aggregators, tops);
this.areAggregatorsConfigured(tableName, aggregators, tops, config);

Assert.fail("AreAggregratorsConfigured failed to throw the expected exception.");

Expand Down Expand Up @@ -357,7 +357,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
resultsForOverridenAreAggregatorsConfigured = true;
areAggregatorsConfiguredCalled = false;

this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, log);
this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, config, log);

Assert.assertTrue("SetCombinerConfigurationIfNecessary() failed to call areAggregatorsConfigured", areAggregatorsConfiguredCalled);
Assert.assertEquals("SetCombinerConfigurationIfNecessary() failed to generate the expected number of debug messages.", 1, debugMessages.size());
Expand All @@ -368,7 +368,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
resultsForOverridenAreAggregatorsConfigured = false;
areAggregatorsConfiguredCalled = false;

this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, log);
this.setAggregatorConfigurationIfNecessary(tableName, aggregators, tops, config, log);

Assert.assertTrue("SetCombinerConfigurationIfNecessary() failed to call areAggregatorsConfigured", areAggregatorsConfiguredCalled);
Assert.assertEquals("SetCombinerConfigurationIfNecessary() failed to generate the expected number of debug messages.", 0, debugMessages.size());
Expand Down Expand Up @@ -411,7 +411,7 @@ public void exposeSetCombinerConfigurationIfNecessaryForTest()
public boolean resultsForSetLocalityGroupsConfigured = false;

@Override
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops)
protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfiguration> aggregators, TableOperations tops, Configuration config)
throws TableNotFoundException {

boolean results = false;
Expand All @@ -423,7 +423,7 @@ protected boolean areAggregatorsConfigured(String tableName, List<CombinerConfig
results = resultsForOverridenAreAggregatorsConfigured;
} else {

results = super.areAggregatorsConfigured(tableName, aggregators, tops);
results = super.areAggregatorsConfigured(tableName, aggregators, tops, config);
}

return results;
Expand Down
Loading