Skip to content

Commit

Permalink
Merge pull request #2288 from eclipse/fix_dbstore-table-name
Browse files Browse the repository at this point in the history
Changes to prevent usage of the same db table between different cloud…
  • Loading branch information
MMaiero authored Oct 19, 2018
2 parents 13b3696 + cacde3d commit 0a63cd7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,7 @@ protected void activate(ComponentContext componentContext, Map<String, Object> p
createThrottle();
submitPublishingWork();

String[] parts = pid.split("-");
String table = "ds_messages";
if (parts.length > 1) {
table += "_" + parts[1];
}
this.store = new DbDataStore(table);
this.store = new DbDataStore(pid);

restartDbServiceTracker(this.dataServiceOptions.getDbServiceInstancePid());

Expand Down Expand Up @@ -887,13 +882,13 @@ public String getCriticalComponentName() {
public int getCriticalComponentTimeout() {
return this.dataServiceOptions.getCriticalComponentTimeout();
}
public Map<String,String> getConnectionInfo() {
Map<String,String> result = new HashMap<>();
result.put("Broker URL", dataTransportService.getBrokerUrl());
result.put("Account", dataTransportService.getAccountName());
result.put("Username", dataTransportService.getUsername());
result.put("Client ID", dataTransportService.getClientId());

public Map<String, String> getConnectionInfo() {
Map<String, String> result = new HashMap<>();
result.put("Broker URL", this.dataTransportService.getBrokerUrl());
result.put("Account", this.dataTransportService.getAccountName());
result.put("Username", this.dataTransportService.getUsername());
result.put("Client ID", this.dataTransportService.getClientId());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class DbDataStore implements DataStore {
private ScheduledFuture<?> houseKeeperTask;
private int capacity;

private final String table;
private final String tableName;
private final String sanitizedTableName;

private final String sqlCreateTable;
private final String sqlCreateIndex;
Expand Down Expand Up @@ -83,48 +84,54 @@ public DbDataStore(String table) {
// do not make this static as it may not be thread safe
this.utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));

this.table = table;
this.tableName = table;
this.sanitizedTableName = sanitizeSql(table);

this.sqlCreateTable = "CREATE TABLE IF NOT EXISTS " + this.table
this.sqlCreateTable = "CREATE TABLE IF NOT EXISTS " + this.sanitizedTableName
+ " (id INTEGER IDENTITY PRIMARY KEY, topic VARCHAR(32767 CHAR), qos INTEGER, retain BOOLEAN, createdOn TIMESTAMP, publishedOn TIMESTAMP, publishedMessageId INTEGER, confirmedOn TIMESTAMP, payload VARBINARY(16777216), priority INTEGER, sessionId VARCHAR(32767 CHAR), droppedOn TIMESTAMP);";
this.sqlCreateIndex = "CREATE INDEX IF NOT EXISTS " + this.table + "_nextMsg ON " + this.table
+ " (publishedOn ASC NULLS FIRST, priority ASC, createdOn ASC, qos);";
this.sqlMessageCount = "SELECT COUNT(*) FROM " + this.table + ";";
this.sqlResetId = "ALTER TABLE " + this.table + " ALTER COLUMN id RESTART WITH 1;";
this.sqlStore = "INSERT INTO " + this.table
this.sqlCreateIndex = "CREATE INDEX IF NOT EXISTS " + sanitizeSql(this.tableName + "_nextMsg") + " ON "
+ this.sanitizedTableName + " (publishedOn ASC NULLS FIRST, priority ASC, createdOn ASC, qos);";
this.sqlMessageCount = "SELECT COUNT(*) FROM " + this.sanitizedTableName + ";";
this.sqlResetId = "ALTER TABLE " + this.sanitizedTableName + " ALTER COLUMN id RESTART WITH 1;";
this.sqlStore = "INSERT INTO " + this.sanitizedTableName
+ " (topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, payload, priority, sessionId, droppedOn) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
this.sqlGetMessage = "SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, payload, priority, sessionId, droppedOn FROM "
+ this.table + " WHERE id = ?";
+ this.sanitizedTableName + " WHERE id = ?";
this.sqlGetNextMessage = "SELECT a.id, a.topic, a.qos, a.retain, a.createdOn, a.publishedOn, a.publishedMessageId, a.confirmedOn, a.payload, a.priority, a.sessionId, a.droppedOn FROM "
+ this.table + " AS a JOIN (SELECT id, publishedOn FROM " + this.table
+ this.sanitizedTableName + " AS a JOIN (SELECT id, publishedOn FROM " + this.sanitizedTableName
+ " ORDER BY publishedOn ASC NULLS FIRST, priority ASC, createdOn ASC LIMIT 1) AS b WHERE a.id = b.id AND b.publishedOn IS NULL;";
this.sqlSetPublished = "UPDATE " + this.table
this.sqlSetPublished = "UPDATE " + this.sanitizedTableName
+ " SET publishedOn = ?, publishedMessageId = ?, sessionId = ? WHERE id = ?;";
this.sqlSetPublished2 = "UPDATE " + this.table + " SET publishedOn = ? WHERE id = ?;";
this.sqlSetConfirmed = "UPDATE " + this.table + " SET confirmedOn = ? WHERE id = ?;";
this.sqlSetPublished2 = "UPDATE " + this.sanitizedTableName + " SET publishedOn = ? WHERE id = ?;";
this.sqlSetConfirmed = "UPDATE " + this.sanitizedTableName + " SET confirmedOn = ? WHERE id = ?;";
this.sqlAllUnpublishedMessages = "SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM "
+ this.table + " WHERE publishedOn IS NULL ORDER BY priority ASC, createdOn ASC;";
+ this.sanitizedTableName + " WHERE publishedOn IS NULL ORDER BY priority ASC, createdOn ASC;";
this.sqlAllInFlightMessages = "SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM "
+ this.table
+ this.sanitizedTableName
+ " WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL AND droppedOn IS NULL ORDER BY priority ASC, createdOn ASC";
this.sqlAllDroppedInFlightMessages = "SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM "
+ this.table + " WHERE droppedOn IS NOT NULL ORDER BY priority ASC, createdOn ASC;";
this.sqlUnpublishAllInFlightMessages = "UPDATE " + this.table
+ this.sanitizedTableName + " WHERE droppedOn IS NOT NULL ORDER BY priority ASC, createdOn ASC;";
this.sqlUnpublishAllInFlightMessages = "UPDATE " + this.sanitizedTableName
+ " SET publishedOn = NULL WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL;";
this.sqlDropAllInFlightMessages = "UPDATE " + this.table
this.sqlDropAllInFlightMessages = "UPDATE " + this.sanitizedTableName
+ " SET droppedOn = ? WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL;";
this.sqlDeleteDroppedMessages = "DELETE FROM " + this.table
this.sqlDeleteDroppedMessages = "DELETE FROM " + this.sanitizedTableName
+ " WHERE droppedOn <= DATEADD('ss', -?, ?) AND droppedOn IS NOT NULL;";
this.sqlDeleteConfirmedMessages = "DELETE FROM " + this.table
this.sqlDeleteConfirmedMessages = "DELETE FROM " + this.sanitizedTableName
+ " WHERE confirmedOn <= DATEADD('ss', -?, ?) AND confirmedOn IS NOT NULL;";
this.sqlDeletePublishedMessages = "DELETE FROM " + this.table
this.sqlDeletePublishedMessages = "DELETE FROM " + this.sanitizedTableName
+ " WHERE qos = 0 AND publishedOn <= DATEADD('ss', -?, ?) AND publishedOn IS NOT NULL;";
this.sqlDuplicateCount = "SELECT count(*) FROM (SELECT id, COUNT(id) FROM " + this.table
this.sqlDuplicateCount = "SELECT count(*) FROM (SELECT id, COUNT(id) FROM " + this.sanitizedTableName
+ " GROUP BY id HAVING (COUNT(id) > 1)) dups;";
this.sqlDropPrimaryKey = "ALTER TABLE " + this.table + " DROP PRIMARY KEY;";
this.sqlDeleteDuplicates = "DELETE FROM " + this.table + " WHERE id IN (SELECT id FROM " + this.table
+ " GROUP BY id HAVING COUNT(*) > 1);";
this.sqlCreatePrimaryKey = "ALTER TABLE " + this.table + " ADD PRIMARY KEY (id);";
this.sqlDropPrimaryKey = "ALTER TABLE " + this.sanitizedTableName + " DROP PRIMARY KEY;";
this.sqlDeleteDuplicates = "DELETE FROM " + this.sanitizedTableName + " WHERE id IN (SELECT id FROM "
+ this.sanitizedTableName + " GROUP BY id HAVING COUNT(*) > 1);";
this.sqlCreatePrimaryKey = "ALTER TABLE " + this.sanitizedTableName + " ADD PRIMARY KEY (id);";
}

private String sanitizeSql(final String string) {
final String sanitizedName = string.replaceAll("\"", "\"\"");
return "\"" + sanitizedName + "\"";
}

// ----------------------------------------------------------
Expand Down Expand Up @@ -153,7 +160,7 @@ public synchronized void stop() {
this.houseKeeperExecutor.shutdownNow();
this.houseKeeperTask = null;
}
dbService = null;
this.dbService = null;
}

private boolean isRepairEnabled() {
Expand Down Expand Up @@ -185,9 +192,9 @@ public synchronized void update(int houseKeeperInterval, int purgeAge, int capac

execute(this.sqlCreateIndex);

createIndex(this.table + "_PUBLISHEDON", this.table, "(PUBLISHEDON DESC)");
createIndex(this.table + "_CONFIRMEDON", this.table, "(CONFIRMEDON DESC)");
createIndex(this.table + "_DROPPEDON", this.table, "(DROPPEDON DESC)");
createIndex(sanitizeSql(this.tableName + "_PUBLISHEDON"), this.sanitizedTableName, "(PUBLISHEDON DESC)");
createIndex(sanitizeSql(this.tableName + "_CONFIRMEDON"), this.sanitizedTableName, "(CONFIRMEDON DESC)");
createIndex(sanitizeSql(this.tableName + "_DROPPEDON"), this.sanitizedTableName, "(DROPPEDON DESC)");

// Start the Housekeeper task
this.houseKeeperTask = this.houseKeeperExecutor.scheduleWithFixedDelay(
Expand Down Expand Up @@ -227,7 +234,7 @@ private synchronized void resetIdentityGenerator() throws KuraStoreException {
@Override
public synchronized DataMessage store(String topic, byte[] payload, int qos, boolean retain, int priority)
throws KuraStoreException {
if (dbService == null) {
if (this.dbService == null) {
throw new KuraStoreException("DbService instance not attached");
}
if (topic == null || topic.trim().length() == 0) {
Expand Down Expand Up @@ -575,12 +582,12 @@ private DataMessage.Builder buildDataMessageBuilder(ResultSet rs) throws SQLExce

private <T> T withConnection(final H2DbService.ConnectionCallable<T> callable, final String exceptionMessage)
throws KuraStoreException {
if (dbService == null) {
if (this.dbService == null) {
throw new KuraStoreException("DbService instance not attached");
}

try {
return dbService.withConnection(callable);
return this.dbService.withConnection(callable);
} catch (final Exception e) {
throw new KuraStoreException(e, exceptionMessage);
}
Expand Down

0 comments on commit 0a63cd7

Please sign in to comment.