Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore (Core): Created upgrade task for Postgres job queue Tables #30325

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.dotmarketing.startup.runonce;

import com.dotmarketing.common.db.DotConnect;
import com.dotmarketing.common.db.DotDatabaseMetaData;
import com.dotmarketing.db.DbConnectionFactory;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.startup.StartupTask;
import com.dotmarketing.util.Logger;
import java.sql.Connection;
import java.sql.SQLException;

/**
* Upgrade task to create the necessary tables and indexes for the Postgres Job Queue
* implementation. This task creates the job_queue, job, and job_history tables along with their
* associated indexes.
*/
public class Task241009CreatePostgresJobQueueTables implements StartupTask {

@Override
public boolean forceRun() {
return true;
}

/**
* Executes the upgrade task, creating the necessary tables and indexes for the Job Queue.
*
* @throws DotDataException if a data access error occurs.
* @throws DotRuntimeException if a runtime error occurs.
*/
@Override
public void executeUpgrade() throws DotDataException, DotRuntimeException {

final Connection connection = DbConnectionFactory.getConnection();
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData();

try {

// job_queue table
if (!databaseMetaData.tableExists(connection, "job_queue")) {
createJobQueueTable();
createJobQueueTableIndexes();
}

// job table
if (!databaseMetaData.tableExists(connection, "job")) {
createJobTable();
createJobTableIndexes();
}

// job_history table
if (!databaseMetaData.tableExists(connection, "job_history")) {
createJobHistoryTable();
createJobHistoryTableIndexes();
}
} catch (SQLException e) {
Logger.error("Error creating job queue tables", e);
throw new DotRuntimeException(e.getMessage(), e);
}
}

/**
* Creates the job_queue table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobQueueTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job_queue (" +
"id VARCHAR(255) PRIMARY KEY, " +
"queue_name VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"priority INTEGER DEFAULT 0, " +
"created_at timestamptz NOT NULL)"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the necessary indexes for the job_queue table.
*
* @throws DotDataException if an error occurs while creating the indexes.
*/
private void createJobQueueTableIndexes() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE INDEX idx_job_queue_status ON job_queue (state)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC)");
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the job table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job (" +
"id VARCHAR(255) PRIMARY KEY, " +
"queue_name VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"parameters JSONB NOT NULL, " +
"result JSONB, " +
"progress FLOAT DEFAULT 0, " +
"created_at timestamptz NOT NULL, " +
"updated_at timestamptz NOT NULL, " +
"started_at timestamptz, " +
"completed_at timestamptz, " +
"execution_node VARCHAR(255), " +
"retry_count INTEGER DEFAULT 0)"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the necessary indexes for the job table.
*
* @throws DotDataException if an error occurs while creating the indexes.
*/
private void createJobTableIndexes() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE INDEX idx_job_parameters ON job USING GIN (parameters)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_result ON job USING GIN (result)");
new DotConnect().executeStatement("CREATE INDEX idx_job_status ON job (state)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_created_at ON job (created_at)");
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the job_history table.
*
* @throws DotDataException if an error occurs while creating the table.
*/
private void createJobHistoryTable() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE TABLE job_history (" +
"id VARCHAR(255) PRIMARY KEY, " +
"job_id VARCHAR(255) NOT NULL, " +
"state VARCHAR(50) NOT NULL, " +
"execution_node VARCHAR(255), " +
"created_at timestamptz NOT NULL, " +
"result JSONB, " +
"FOREIGN KEY (job_id) REFERENCES job (id))"
);
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

/**
* Creates the necessary indexes for the job_history table.
*
* @throws DotDataException if an error occurs while creating the indexes.
*/
private void createJobHistoryTableIndexes() throws DotDataException {
try {
new DotConnect().executeStatement(
"CREATE INDEX idx_job_history_job_id ON job_history (job_id)");
new DotConnect().executeStatement(
"CREATE INDEX idx_job_history_job_id_state ON job_history (job_id, state)");
} catch (Exception ex) {
throw new DotDataException(ex.getMessage(), ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public static List<Class<?>> getStartupRunOnceTaskClasses() {
.add(Task240530AddDotAIPortletToLayout.class)
.add(Task240606AddVariableColumnToWorkflow.class)
.add(Task241013RemoveFullPathLcColumnFromIdentifier.class)
.add(Task241009CreatePostgresJobQueueTables.class)
.build();
return ret.stream().sorted(classNameComparator).collect(Collectors.toList());
}
Expand Down
4 changes: 3 additions & 1 deletion dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
import com.dotmarketing.startup.runonce.Task240513UpdateContentTypesSystemFieldTest;
import com.dotmarketing.startup.runonce.Task240530AddDotAIPortletToLayoutTest;
import com.dotmarketing.startup.runonce.Task240606AddVariableColumnToWorkflowTest;
import com.dotmarketing.startup.runonce.Task241009CreatePostgresJobQueueTablesTest;
import com.dotmarketing.startup.runonce.Task241013RemoveFullPathLcColumnFromIdentifierTest;
import com.dotmarketing.util.ConfigUtilsTest;
import com.dotmarketing.util.ITConfigTest;
Expand Down Expand Up @@ -383,7 +384,8 @@
ConfigUtilsTest.class,
SimpleInjectionIT.class,
LegacyJSONObjectRenderTest.class,
Task241013RemoveFullPathLcColumnFromIdentifierTest.class
Task241013RemoveFullPathLcColumnFromIdentifierTest.class,
Task241009CreatePostgresJobQueueTablesTest.class
})

public class MainSuite2b {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.dotmarketing.startup.runonce;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.dotcms.IntegrationTestBase;
import com.dotcms.util.IntegrationTestInitService;
import com.dotmarketing.common.db.DotDatabaseMetaData;
import com.dotmarketing.db.DbConnectionFactory;
import com.dotmarketing.db.LocalTransaction;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.exception.DotSecurityException;
import com.dotmarketing.util.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/**
* Test class for {@link Task241009CreatePostgresJobQueueTables}.
* <p>
* This test class ensures that the job queue tables are properly created by the upgrade task.
* <p>
* The test first drops the tables if they exist, then executes the upgrade task and validates that
* the tables were successfully created.
*/
public class Task241009CreatePostgresJobQueueTablesTest extends IntegrationTestBase {

/**
* Initializes the test environment and ensures the job queue tables do not exist.
*
* @throws Exception if an error occurs during initialization.
*/
@BeforeClass
public static void setup() throws Exception {

// Setting web app environment
IntegrationTestInitService.getInstance().init();
}

/**
* Drops the job queue tables if they exist.
*/
private void dropTablesIfExist() {

try {

final Connection connection = DbConnectionFactory.getConnection();
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData();

if (databaseMetaData.tableExists(connection, "job_queue")) {
databaseMetaData.dropTable(connection, "job_queue");
}
if (databaseMetaData.tableExists(connection, "job_history")) {
databaseMetaData.dropTable(connection, "job_history");
}
if (databaseMetaData.tableExists(connection, "job")) {
databaseMetaData.dropTable(connection, "job");
}

} catch (Exception e) {
throw new DotRuntimeException(e);
}
}

/**
* Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and
* {@link Task241009CreatePostgresJobQueueTables#forceRun()}.
* <p>
* Given Scenario: The job queue tables do not exist.
* <p>
* Expected Result: The job queue tables will be created after running the upgrade task.
*
* @throws SQLException if a SQL error occurs.
* @throws DotDataException if a data access error occurs.
* @throws DotSecurityException if a security error occurs.
*/
@Test
public void executeTaskUpgrade() throws SQLException, DotDataException, DotSecurityException {

try {
// First, ensure the tables do not exist
LocalTransaction.wrap(() -> {
try {
dropTablesIfExist();
} catch (Exception e) {
throw new DotRuntimeException(e);
}
});

// Running the upgrade task and validating the tables were created
executeUpgradeAndValidate();
} finally {
DbConnectionFactory.closeSilently();
}
}

/**
* Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and
* {@link Task241009CreatePostgresJobQueueTables#forceRun()}.
* <p>
* Given Scenario: The job queue tables do not exist, and the upgrade task is run twice.
* <p>
* Expected Result: The job queue tables will be created after running the upgrade task the
* first time and should not fail when running the upgrade task again.
*
* @throws SQLException if a SQL error occurs.
* @throws DotDataException if a data access error occurs.
* @throws DotSecurityException if a security error occurs.
*/
@Test
public void executeTaskUpgradeTwice()
throws SQLException, DotDataException, DotSecurityException {

try {
// First, ensure the tables do not exist
LocalTransaction.wrap(() -> {
try {
dropTablesIfExist();
} catch (Exception e) {
throw new DotRuntimeException(e);
}
});

// Run the upgrade task for the first time, it should create the tables
executeUpgradeAndValidate();

// Run the upgrade task again, should not fail
LocalTransaction.wrap(() -> {
try {
final var task = new Task241009CreatePostgresJobQueueTables();
task.executeUpgrade();
} catch (Exception e) {
final var message = "The upgrade task should not fail when the tables already exist";
Logger.error(message, e);
Assert.fail(message);
}
});
} finally {
DbConnectionFactory.closeSilently();
}
}

/**
* Executes the upgrade task and validates the job queue tables were created.
*/
private static void executeUpgradeAndValidate()
throws SQLException, DotDataException, DotSecurityException {

final var task = new Task241009CreatePostgresJobQueueTables();
final Connection connection = DbConnectionFactory.getConnection();
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData();

// Ensure the tables do not exist before the upgrade
assertFalse(databaseMetaData.tableExists(connection, "job_queue"));
assertFalse(databaseMetaData.tableExists(connection, "job"));
assertFalse(databaseMetaData.tableExists(connection, "job_history"));

assertTrue(task.forceRun());
LocalTransaction.wrap(() -> {
try {
task.executeUpgrade();
} catch (Exception e) {
throw new DotRuntimeException(e);
}
});

// Validate the tables were created after the upgrade
assertTrue(databaseMetaData.tableExists(connection, "job_queue"));
assertTrue(databaseMetaData.tableExists(connection, "job"));
assertTrue(databaseMetaData.tableExists(connection, "job_history"));
}

}