Skip to content

Commit

Permalink
#30315 Added Task241009CreatePostgresJobQueueTables upgrade task
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Oct 10, 2024
1 parent e5e00cb commit c30d64a
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.dotmarketing.startup.runonce;

import com.dotmarketing.common.db.DotConnect;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.startup.StartupTask;

/**
* Upgrade task to create the necessary tables and indexes for the Postgres Job Queue
* implementation. This task creates the job_queue, job, and job_history tables along with their
* associated indexes.
*/
public class Task241009CreatePostgresJobQueueTables implements StartupTask {

@Override
public boolean forceRun() {
return true;
}

/**
* Executes the upgrade task, creating the necessary tables and indexes for the Job Queue.
*
* @throws DotDataException if a data access error occurs.
* @throws DotRuntimeException if a runtime error occurs.
*/
@Override
public void executeUpgrade() throws DotDataException, DotRuntimeException {

// Create the job queue tables
createJobQueueTable();
createJobTable();
createJobHistoryTable();

// Create the indexes
createIndexes();
}

/**
* Creates the job_queue table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobQueueTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job_queue (" +
"id VARCHAR(255) PRIMARY KEY, " +
"queue_name VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"priority INTEGER DEFAULT 0, " +
"created_at timestamptz NOT NULL)"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the job table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job (" +
"id VARCHAR(255) PRIMARY KEY, " +
"queue_name VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"parameters JSONB NOT NULL, " +
"result JSONB, " +
"progress FLOAT DEFAULT 0, " +
"created_at timestamptz NOT NULL, " +
"updated_at timestamptz NOT NULL, " +
"started_at timestamptz, " +
"completed_at timestamptz, " +
"execution_node VARCHAR(255), " +
"retry_count INTEGER DEFAULT 0)"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the job_history table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobHistoryTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job_history (" +
"id VARCHAR(255) PRIMARY KEY, " +
"job_id VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"execution_node VARCHAR(255), " +
"created_at timestamptz NOT NULL, " +
"result JSONB, " +
"FOREIGN KEY (job_id) REFERENCES job (id))"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the necessary indexes for the job queue tables.
*
* @throws DotDataException if an error occurs while creating the indexes.
*/
private void createIndexes() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE INDEX idx_job_queue_status ON job_queue (state)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_parameters ON job USING GIN (parameters)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_result ON job USING GIN (result)");
new DotConnect().executeStatement("CREATE INDEX idx_job_status ON job (state)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_created_at ON job (created_at)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_history_job_id ON job_history (job_id)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_history_job_id_state ON job_history (job_id, state)");
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public static List<Class<?>> getStartupRunOnceTaskClasses() {
.add(Task240530AddDotAIPortletToLayout.class)
.add(Task240606AddVariableColumnToWorkflow.class)
.add(Task241013RemoveFullPathLcColumnFromIdentifier.class)
.add(Task241009CreatePostgresJobQueueTables.class)
.build();
return ret.stream().sorted(classNameComparator).collect(Collectors.toList());
}
Expand Down
3 changes: 2 additions & 1 deletion dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@
ConfigUtilsTest.class,
SimpleInjectionIT.class,
LegacyJSONObjectRenderTest.class,
Task241013RemoveFullPathLcColumnFromIdentifierTest.class
Task241013RemoveFullPathLcColumnFromIdentifierTest.class,
Task241009CreatePostgresJobQueueTablesTest.class
})

public class MainSuite2b {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.dotmarketing.startup.runonce;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.dotcms.IntegrationTestBase;
import com.dotcms.util.IntegrationTestInitService;
import com.dotmarketing.common.db.DotDatabaseMetaData;
import com.dotmarketing.db.DbConnectionFactory;
import com.dotmarketing.db.LocalTransaction;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.exception.DotSecurityException;
import java.sql.Connection;
import java.sql.SQLException;
import org.junit.BeforeClass;
import org.junit.Test;

/**
* Test class for {@link Task241009CreatePostgresJobQueueTables}.
* <p>
* This test class ensures that the job queue tables are properly created by the upgrade task.
* <p>
* The test first drops the tables if they exist, then executes the upgrade task and validates that
* the tables were successfully created.
*/
public class Task241009CreatePostgresJobQueueTablesTest extends IntegrationTestBase {

/**
* Initializes the test environment and ensures the job queue tables do not exist.
*
* @throws Exception if an error occurs during initialization.
*/
@BeforeClass
public static void setup() throws Exception {

// Setting web app environment
IntegrationTestInitService.getInstance().init();
LocalTransaction.wrap(Task241009CreatePostgresJobQueueTablesTest::dropTablesIfExist);
}

/**
* Drops the job queue tables if they exist.
*/
private static void dropTablesIfExist() {

try {

final Connection connection = DbConnectionFactory.getConnection();
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData();

if (databaseMetaData.tableExists(connection, "job_queue")) {
databaseMetaData.dropTable(connection, "job_queue");
}
if (databaseMetaData.tableExists(connection, "job_history")) {
databaseMetaData.dropTable(connection, "job_history");
}
if (databaseMetaData.tableExists(connection, "job")) {
databaseMetaData.dropTable(connection, "job");
}

} catch (Exception e) {
throw new DotRuntimeException(e);
}
}

/**
* Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and
* {@link Task241009CreatePostgresJobQueueTables#forceRun()}.
* <p>
* Given Scenario: The job queue tables do not exist.
* <p>
* Expected Result: The job queue tables will be created after running the upgrade task.
*
* @throws SQLException if a SQL error occurs.
* @throws DotDataException if a data access error occurs.
* @throws DotSecurityException if a security error occurs.
*/
@Test
public void executeTaskUpgrade() throws SQLException, DotDataException, DotSecurityException {

final var task = new Task241009CreatePostgresJobQueueTables();
final Connection connection = DbConnectionFactory.getConnection();
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData();

// Ensure the tables do not exist before the upgrade
assertFalse(databaseMetaData.tableExists(connection, "job_queue"));
assertFalse(databaseMetaData.tableExists(connection, "job"));
assertFalse(databaseMetaData.tableExists(connection, "job_history"));

assertTrue(task.forceRun());
LocalTransaction.wrap(() -> {
try {
task.executeUpgrade();
} catch (Exception e) {
throw new DotRuntimeException(e);
}
});

// Validate the tables were created after the upgrade
assertTrue(databaseMetaData.tableExists(connection, "job_queue"));
assertTrue(databaseMetaData.tableExists(connection, "job"));
assertTrue(databaseMetaData.tableExists(connection, "job_history"));
}
}

0 comments on commit c30d64a

Please sign in to comment.