Skip to content

Commit

Permalink
Adds proper transaction support for the addAll() methods.
Browse files Browse the repository at this point in the history
Signed-off-by: Ralph Gasser <[email protected]>
  • Loading branch information
ppanopticon committed Oct 31, 2024
1 parent e225efb commit f217e4c
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import java.sql.*
* @author Ralph Gasser
* @version 1.0.0
*/
open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schema.Field<*, D>, override val connection: PgVectorConnection) : DescriptorWriter<D> {
open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schema.Field<*, D>, override val connection: PgVectorConnection, protected val batchSize: Int = 1000) : DescriptorWriter<D> {

/** The name of the table backing this [PgDescriptorInitializer]. */
protected val tableName: String = "${DESCRIPTOR_ENTITY_PREFIX}_${this.field.fieldName}"

Expand Down Expand Up @@ -56,6 +57,9 @@ open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schem
*/
override fun addAll(items: Iterable<D>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.prepareInsertStatement().use { stmt ->
for (item in items) {
stmt.setObject(1, item.id)
Expand All @@ -70,12 +74,37 @@ open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schem
}
}
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch and commit. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to INSERT descriptors into \"${tableName.lowercase()}\" due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.sql.SQLException
* @author Ralph Gasser
* @version 1.0.0
*/
internal class RetrievableWriter(override val connection: PgVectorConnection): RetrievableWriter {
internal class RetrievableWriter(override val connection: PgVectorConnection, private val batchSize: Int = 1000) : RetrievableWriter {
/**
* Adds a new [Retrievable] to the database using this [RetrievableWriter] instance.
*
Expand All @@ -38,17 +38,45 @@ internal class RetrievableWriter(override val connection: PgVectorConnection): R
*/
override fun addAll(items: Iterable<Retrievable>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.connection.jdbc.prepareStatement("INSERT INTO $RETRIEVABLE_ENTITY_NAME ($RETRIEVABLE_ID_COLUMN_NAME, $RETRIEVABLE_TYPE_COLUMN_NAME) VALUES (?, ?);").use { stmt ->
for (item in items) {
stmt.setObject(1, item.id)
stmt.setString(2, item.type)
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to persist retrievables due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down Expand Up @@ -136,18 +164,46 @@ internal class RetrievableWriter(override val connection: PgVectorConnection): R
*/
override fun connectAll(relationships: Iterable<Relationship>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.connection.jdbc.prepareStatement("INSERT INTO $RELATIONSHIP_ENTITY_NAME ($OBJECT_ID_COLUMN_NAME,$PREDICATE_COLUMN_NAME,$SUBJECT_ID_COLUMN_NAME) VALUES (?,?,?)").use { stmt ->
for (relationship in relationships) {
stmt.setObject(1, relationship.objectId)
stmt.setString(2, relationship.predicate)
stmt.setObject(3, relationship.subjectId)
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to insert relationships due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down

0 comments on commit f217e4c

Please sign in to comment.