Skip to content

Commit

Permalink
Allow more prompt cancellations
Browse files Browse the repository at this point in the history
Affects: #18
  • Loading branch information
io7m committed Oct 8, 2024
1 parent bad7bf7 commit 1769d69
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 8 deletions.
5 changes: 3 additions & 2 deletions README-CHANGES.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<c:change date="2024-09-25T00:00:00+00:00" summary="Add release signing for F-Droid."/>
</c:changes>
</c:release>
<c:release date="2024-10-08T20:34:27+00:00" is-open="true" ticket-system="com.github.io7m.exfilac" version="1.1.0">
<c:release date="2024-10-08T21:04:03+00:00" is-open="true" ticket-system="com.github.io7m.exfilac" version="1.1.0">
<c:changes>
<c:change date="2024-10-08T00:00:00+00:00" summary="Fix status screen text on small screens.">
<c:tickets>
Expand All @@ -57,11 +57,12 @@
<c:ticket id="16"/>
</c:tickets>
</c:change>
<c:change date="2024-10-08T20:34:27+00:00" summary="Add more upload schedules.">
<c:change date="2024-10-08T00:00:00+00:00" summary="Add more upload schedules.">
<c:tickets>
<c:ticket id="19"/>
</c:tickets>
</c:change>
<c:change date="2024-10-08T21:04:03+00:00" summary="Allow for more prompt cancellation behaviour."/>
</c:changes>
</c:release>
</c:releases>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import com.io7m.exfilac.core.internal.database.EFQUploadRecordCreateType
import com.io7m.exfilac.core.internal.database.EFQUploadRecordUpdateType
import com.io7m.exfilac.s3_uploader.api.EFS3TransferStatistics
import com.io7m.exfilac.s3_uploader.api.EFS3UploadRequest
import com.io7m.exfilac.s3_uploader.api.EFS3UploadType
import com.io7m.exfilac.s3_uploader.api.EFS3UploaderType
import com.io7m.jattribute.core.AttributeType
import com.io7m.taskrecorder.core.TRNoResult
Expand Down Expand Up @@ -82,6 +83,7 @@ class EFUploadTask(
val clock: EFClockServiceType,
) {

private var s3Upload: EFS3UploadType? = null
private val uploadRecord: AtomicReference<EFUploadRecord> = AtomicReference()
private var bucketConfiguration: EFBucketConfiguration? = null
private var uploadConfiguration: EFUploadConfiguration? = null
Expand Down Expand Up @@ -111,6 +113,12 @@ class EFUploadTask(
} catch (e: Throwable) {
this.setFailed(e)
throw e
} finally {
try {
this.s3Upload?.close()
} catch (e: Throwable) {
this.logger.debug("Failed to close upload: ", e)
}
}
}

Expand Down Expand Up @@ -233,7 +241,8 @@ class EFUploadTask(
}
)

this.s3Uploader.create(uploadInfo, this.clock).use { upload -> upload.execute() }
this.s3Upload = this.s3Uploader.create(uploadInfo, this.clock)
this.s3Upload?.execute()
step.setStepSucceeded("OK")
} catch (e: Throwable) {
step.setStepFailed(this.exceptionMessage(e), e)
Expand Down Expand Up @@ -579,6 +588,13 @@ class EFUploadTask(

fun cancel() {
this.cancelled.set(true)

try {
this.s3Upload?.close()
} catch (e: Throwable) {
this.logger.debug("Failed to close upload: ", e)
}

this.onStatusChanged(EFUploadStatusCancelling(this.name, id = this.uploadRecord.get()?.id))
this.statusChangedSource.set(EFUploadStatusChanged())
}
Expand Down
1 change: 1 addition & 0 deletions com.io7m.exfilac.s3_uploader.amazon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {

implementation(libs.apache.commons.io)
implementation(libs.apache.commons.math3)
implementation(libs.io7m.jmulticlose.core)
implementation(libs.io7m.peixoto.sdk)
implementation(libs.kotlin.stdlib)
implementation(libs.slf4j)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import com.io7m.exfilac.clock.api.EFClockServiceType
import com.io7m.exfilac.s3_uploader.api.EFS3TransferStatistics
import com.io7m.exfilac.s3_uploader.api.EFS3UploadRequest
import com.io7m.exfilac.s3_uploader.api.EFS3UploadType
import com.io7m.jmulticlose.core.CloseableCollection
import com.io7m.peixoto.sdk.org.apache.commons.codec.binary.Base64
import com.io7m.peixoto.sdk.software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import com.io7m.peixoto.sdk.software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import com.io7m.peixoto.sdk.software.amazon.awssdk.awscore.retry.AwsRetryStrategy
import com.io7m.peixoto.sdk.software.amazon.awssdk.core.sync.RequestBody
import com.io7m.peixoto.sdk.software.amazon.awssdk.http.SdkHttpClient
import com.io7m.peixoto.sdk.software.amazon.awssdk.http.apache.ApacheHttpClient
import com.io7m.peixoto.sdk.software.amazon.awssdk.regions.Region
import com.io7m.peixoto.sdk.software.amazon.awssdk.services.s3.S3Client
Expand All @@ -40,6 +42,7 @@ import com.io7m.peixoto.sdk.software.amazon.awssdk.services.s3.model.UploadPartR
import org.apache.commons.io.input.BoundedInputStream
import org.slf4j.LoggerFactory
import java.security.MessageDigest
import java.time.Duration
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
Expand All @@ -50,6 +53,7 @@ class EFS3AMZUpload(
private val upload: EFS3UploadRequest,
private val clock: EFClockServiceType,
) : EFS3UploadType {

private val logger =
LoggerFactory.getLogger(EFS3AMZUpload::class.java)

Expand All @@ -60,7 +64,12 @@ class EFS3AMZUpload(
private val minimumChunkSize =
8_388_608L

private val resources =
CloseableCollection.create()

private lateinit var executor: ExecutorService
private lateinit var httpClient: SdkHttpClient
private lateinit var s3client: S3Client

private val done =
AtomicBoolean()
Expand All @@ -79,6 +88,8 @@ class EFS3AMZUpload(
thread
}

this.resources.add(AutoCloseable { this.executor.shutdown() })

try {
this.executor.execute(this::executeStreamSupervisor)

Expand All @@ -87,13 +98,17 @@ class EFS3AMZUpload(
AwsBasicCredentials.create(this.upload.accessKey, this.upload.secretKey)
)

val httpClient =
ApacheHttpClient.builder()
.build()
this.httpClient =
this.resources.add(
ApacheHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(10L))
.socketTimeout(Duration.ofSeconds(10L))
.build()
)

val clientBuilder = S3Client.builder()
clientBuilder.credentialsProvider(credentials)
clientBuilder.httpClient(httpClient)
clientBuilder.httpClient(this.httpClient)
clientBuilder.region(Region.of(this.upload.region))
clientBuilder.region(Region.of(this.upload.region))
clientBuilder.forcePathStyle(this.upload.pathStyle)
Expand All @@ -102,12 +117,13 @@ class EFS3AMZUpload(
val strategy =
AwsRetryStrategy.standardRetryStrategy()
.toBuilder()
.maxAttempts(10)
.maxAttempts(5)
.build()

clientBuilder.overrideConfiguration { o -> o.retryStrategy(strategy) }

return clientBuilder.build().use { c ->
this.s3client = this.resources.add(c)
if (this.upload.size >= this.multipartThreshold) {
this.executeUploadMultiPart(c)
} else {
Expand Down Expand Up @@ -382,5 +398,11 @@ class EFS3AMZUpload(

override fun close() {
this.done.set(true)

try {
this.resources.close()
} catch (e: Throwable) {
this.logger.debug("Failed to close S3 client: ", e)
}
}
}

0 comments on commit 1769d69

Please sign in to comment.