diff --git a/README-CHANGES.xml b/README-CHANGES.xml index 10e5ee5..c5c4bfa 100644 --- a/README-CHANGES.xml +++ b/README-CHANGES.xml @@ -40,7 +40,7 @@ - + @@ -57,11 +57,12 @@ - + + diff --git a/com.io7m.exfilac.core/src/main/java/com/io7m/exfilac/core/internal/uploads/EFUploadTask.kt b/com.io7m.exfilac.core/src/main/java/com/io7m/exfilac/core/internal/uploads/EFUploadTask.kt index 20f5a45..f9a76ea 100644 --- a/com.io7m.exfilac.core/src/main/java/com/io7m/exfilac/core/internal/uploads/EFUploadTask.kt +++ b/com.io7m.exfilac.core/src/main/java/com/io7m/exfilac/core/internal/uploads/EFUploadTask.kt @@ -53,6 +53,7 @@ import com.io7m.exfilac.core.internal.database.EFQUploadRecordCreateType import com.io7m.exfilac.core.internal.database.EFQUploadRecordUpdateType import com.io7m.exfilac.s3_uploader.api.EFS3TransferStatistics import com.io7m.exfilac.s3_uploader.api.EFS3UploadRequest +import com.io7m.exfilac.s3_uploader.api.EFS3UploadType import com.io7m.exfilac.s3_uploader.api.EFS3UploaderType import com.io7m.jattribute.core.AttributeType import com.io7m.taskrecorder.core.TRNoResult @@ -82,6 +83,7 @@ class EFUploadTask( val clock: EFClockServiceType, ) { + private var s3Upload: EFS3UploadType? = null private val uploadRecord: AtomicReference = AtomicReference() private var bucketConfiguration: EFBucketConfiguration? = null private var uploadConfiguration: EFUploadConfiguration? = null @@ -111,6 +113,12 @@ class EFUploadTask( } catch (e: Throwable) { this.setFailed(e) throw e + } finally { + try { + this.s3Upload?.close() + } catch (e: Throwable) { + this.logger.debug("Failed to close upload: ", e) + } } } @@ -233,7 +241,8 @@ class EFUploadTask( } ) - this.s3Uploader.create(uploadInfo, this.clock).use { upload -> upload.execute() } + this.s3Upload = this.s3Uploader.create(uploadInfo, this.clock) + this.s3Upload?.execute() step.setStepSucceeded("OK") } catch (e: Throwable) { step.setStepFailed(this.exceptionMessage(e), e) @@ -579,6 +588,13 @@ class EFUploadTask( fun cancel() { this.cancelled.set(true) + + try { + this.s3Upload?.close() + } catch (e: Throwable) { + this.logger.debug("Failed to close upload: ", e) + } + this.onStatusChanged(EFUploadStatusCancelling(this.name, id = this.uploadRecord.get()?.id)) this.statusChangedSource.set(EFUploadStatusChanged()) } diff --git a/com.io7m.exfilac.s3_uploader.amazon/build.gradle.kts b/com.io7m.exfilac.s3_uploader.amazon/build.gradle.kts index 62734a1..fedb5e5 100644 --- a/com.io7m.exfilac.s3_uploader.amazon/build.gradle.kts +++ b/com.io7m.exfilac.s3_uploader.amazon/build.gradle.kts @@ -5,6 +5,7 @@ dependencies { implementation(libs.apache.commons.io) implementation(libs.apache.commons.math3) + implementation(libs.io7m.jmulticlose.core) implementation(libs.io7m.peixoto.sdk) implementation(libs.kotlin.stdlib) implementation(libs.slf4j) diff --git a/com.io7m.exfilac.s3_uploader.amazon/src/main/java/com/io7m/exfilac/s3_uploader/amazon/EFS3AMZUpload.kt b/com.io7m.exfilac.s3_uploader.amazon/src/main/java/com/io7m/exfilac/s3_uploader/amazon/EFS3AMZUpload.kt index 5a4ddc3..22b2035 100644 --- a/com.io7m.exfilac.s3_uploader.amazon/src/main/java/com/io7m/exfilac/s3_uploader/amazon/EFS3AMZUpload.kt +++ b/com.io7m.exfilac.s3_uploader.amazon/src/main/java/com/io7m/exfilac/s3_uploader/amazon/EFS3AMZUpload.kt @@ -20,11 +20,13 @@ import com.io7m.exfilac.clock.api.EFClockServiceType import com.io7m.exfilac.s3_uploader.api.EFS3TransferStatistics import com.io7m.exfilac.s3_uploader.api.EFS3UploadRequest import com.io7m.exfilac.s3_uploader.api.EFS3UploadType +import com.io7m.jmulticlose.core.CloseableCollection import com.io7m.peixoto.sdk.org.apache.commons.codec.binary.Base64 import com.io7m.peixoto.sdk.software.amazon.awssdk.auth.credentials.AwsBasicCredentials import com.io7m.peixoto.sdk.software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import com.io7m.peixoto.sdk.software.amazon.awssdk.awscore.retry.AwsRetryStrategy import com.io7m.peixoto.sdk.software.amazon.awssdk.core.sync.RequestBody +import com.io7m.peixoto.sdk.software.amazon.awssdk.http.SdkHttpClient import com.io7m.peixoto.sdk.software.amazon.awssdk.http.apache.ApacheHttpClient import com.io7m.peixoto.sdk.software.amazon.awssdk.regions.Region import com.io7m.peixoto.sdk.software.amazon.awssdk.services.s3.S3Client @@ -40,6 +42,7 @@ import com.io7m.peixoto.sdk.software.amazon.awssdk.services.s3.model.UploadPartR import org.apache.commons.io.input.BoundedInputStream import org.slf4j.LoggerFactory import java.security.MessageDigest +import java.time.Duration import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit @@ -50,6 +53,7 @@ class EFS3AMZUpload( private val upload: EFS3UploadRequest, private val clock: EFClockServiceType, ) : EFS3UploadType { + private val logger = LoggerFactory.getLogger(EFS3AMZUpload::class.java) @@ -60,7 +64,12 @@ class EFS3AMZUpload( private val minimumChunkSize = 8_388_608L + private val resources = + CloseableCollection.create() + private lateinit var executor: ExecutorService + private lateinit var httpClient: SdkHttpClient + private lateinit var s3client: S3Client private val done = AtomicBoolean() @@ -79,6 +88,8 @@ class EFS3AMZUpload( thread } + this.resources.add(AutoCloseable { this.executor.shutdown() }) + try { this.executor.execute(this::executeStreamSupervisor) @@ -87,13 +98,17 @@ class EFS3AMZUpload( AwsBasicCredentials.create(this.upload.accessKey, this.upload.secretKey) ) - val httpClient = - ApacheHttpClient.builder() - .build() + this.httpClient = + this.resources.add( + ApacheHttpClient.builder() + .connectionTimeout(Duration.ofSeconds(10L)) + .socketTimeout(Duration.ofSeconds(10L)) + .build() + ) val clientBuilder = S3Client.builder() clientBuilder.credentialsProvider(credentials) - clientBuilder.httpClient(httpClient) + clientBuilder.httpClient(this.httpClient) clientBuilder.region(Region.of(this.upload.region)) clientBuilder.region(Region.of(this.upload.region)) clientBuilder.forcePathStyle(this.upload.pathStyle) @@ -102,12 +117,13 @@ class EFS3AMZUpload( val strategy = AwsRetryStrategy.standardRetryStrategy() .toBuilder() - .maxAttempts(10) + .maxAttempts(5) .build() clientBuilder.overrideConfiguration { o -> o.retryStrategy(strategy) } return clientBuilder.build().use { c -> + this.s3client = this.resources.add(c) if (this.upload.size >= this.multipartThreshold) { this.executeUploadMultiPart(c) } else { @@ -382,5 +398,11 @@ class EFS3AMZUpload( override fun close() { this.done.set(true) + + try { + this.resources.close() + } catch (e: Throwable) { + this.logger.debug("Failed to close S3 client: ", e) + } } }