From d68b872a08457335a5edc5771e21b794122697da Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 9 Jan 2017 00:55:53 +0100 Subject: [PATCH 01/28] Introduce a new client setting for enabling http compression --- src/main/resources/reference.conf | 3 +++ .../com/scalapenos/riak/internal/RiakClientSettings.scala | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index fb5ae99..101ffdb 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -22,6 +22,9 @@ riak { # # Riak server designates values as tombstones by adding an optional 'X-Riak-Deleted' header. ignore-tombstones = yes + + # Should the client use an compression (e.g. Gzip) when talking to Riak via HTTP. + enable-http-compression = yes } spray.can.client { diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala index 2998716..89c002e 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala @@ -43,6 +43,14 @@ private[riak] class RiakClientSettings(config: Config) { */ final val IgnoreTombstones: Boolean = config.getBoolean("riak.ignore-tombstones") + /** + * Setting for controlling whether the Riak client should use a compression (e.g. Gzip) + * when sending and receiving data via HTTP connection to Riak. + * + * This value defaults to true. + */ + final val EnableHttpCompression: Boolean = config.getBoolean("riak.enable-http-compression") + // TODO: add setting for silently ignoring indexes on backends that don't allow them. The alternative is failing/throwing exceptions } From fa22adfc6f86ec2e240e312c1ae04217cab716f5 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 9 Jan 2017 00:56:43 +0100 Subject: [PATCH 02/28] Apply Gzip compression and decompression when the enable-http-compression setting is enabled --- .../riak/internal/RiakHttpClientHelper.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 7c74c16..41f828d 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -18,6 +18,7 @@ package com.scalapenos.riak package internal import akka.actor._ +import spray.http.HttpEncodingRange private[riak] object RiakHttpClientHelper { import spray.http.HttpEntity @@ -42,6 +43,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.http.StatusCodes._ import spray.http.HttpHeaders._ import spray.httpx.SprayJsonSupport._ + import spray.httpx.encoding.{ Gzip, Deflate } import spray.json.DefaultJsonProtocol._ import org.slf4j.LoggerFactory @@ -200,10 +202,20 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None - private def httpRequest = { - addOptionalHeader(clientIdHeader) ~> - addHeader("Accept", "*/*, multipart/mixed") ~> + private val basePipeline = { + if (settings.EnableHttpCompression) { + addHeader(`Accept-Encoding`(Gzip.encoding)) ~> + addHeader(`Content-Encoding`.apply(Gzip.encoding)) ~> + encode(Gzip) ~> + sendReceive ~> + decode(Gzip) + } else { sendReceive + } + } + + private def httpRequest = { + addOptionalHeader(clientIdHeader) ~> addHeader("Accept", "*/*, multipart/mixed") ~> basePipeline } private def createStoreHttpRequest(value: RiakValue) = { From be0f01f886c1859b0ac51336f32c1f9a85cea62b Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 11:43:51 +0100 Subject: [PATCH 03/28] Remove a redundant Content-Encoding header directive. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It is added as part of the standard ‘compress’ directive. --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 41f828d..49a647d 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -205,7 +205,6 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private val basePipeline = { if (settings.EnableHttpCompression) { addHeader(`Accept-Encoding`(Gzip.encoding)) ~> - addHeader(`Content-Encoding`.apply(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> decode(Gzip) From 493fec6084b999b9714ab7fe7ad85350f4d11e98 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 12:20:27 +0100 Subject: [PATCH 04/28] Replace `val` with `def` --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 49a647d..c247518 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -202,7 +202,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None - private val basePipeline = { + private def basePipeline = { if (settings.EnableHttpCompression) { addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> From a628f90f61c73848e2bf42a5e8493edef91fdf9f Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 12:26:52 +0100 Subject: [PATCH 05/28] Introduce gzip decompression for multipart chunks --- .../riak/internal/RiakHttpClientHelper.scala | 51 +++++++++++++++---- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index c247518..9118760 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -19,6 +19,7 @@ package internal import akka.actor._ import spray.http.HttpEncodingRange +import spray.httpx.encoding.{ Decompressor, Encoder, GzipCompressor, GzipDecompressor } private[riak] object RiakHttpClientHelper { import spray.http.HttpEntity @@ -264,8 +265,20 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup // Conflict Resolution // ========================================================================== + import spray.http._ + + private def decompressWith(decompressorFactory: ⇒ Decompressor)(bodyPart: BodyPart): BodyPart = { + bodyPart.entity match { + case entity @ HttpEntity.NonEmpty(contentType, data) if bodyPart.headers.find(_.isInstanceOf[`Content-Encoding`]).exists(_.value.contains("gzip")) ⇒ + val data = entity.data + + bodyPart.copy(entity = HttpEntity(contentType, decompressorFactory.decompress(data.toByteArray)), headers = bodyPart.headers.filterNot(_.isInstanceOf[`Content-Encoding`])) + + case _ ⇒ bodyPart + } + } + private def resolveConflict(server: RiakServerInfo, bucket: String, key: String, response: HttpResponse, resolver: RiakConflictsResolver): Future[RiakValue] = { - import spray.http._ import spray.httpx.unmarshalling._ import FixedMultipartContentUnmarshalling._ @@ -276,11 +289,19 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup response.entity.as[MultipartContent] match { case Left(error) ⇒ throw new ConflictResolutionFailed(error.toString) case Right(multipartContent) ⇒ - val bodyParts = - if (settings.IgnoreTombstones) - multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) - else - multipartContent.parts + val bodyParts = { + val values = + if (settings.IgnoreTombstones) + multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) + else + multipartContent.parts + + if (settings.EnableHttpCompression) { + values.map(decompressWith(new GzipDecompressor)) + } else { + values + } + } val values = bodyParts.flatMap(part ⇒ toRiakValue(part.entity, vclockHeader ++ part.headers)).toSet @@ -306,11 +327,19 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup response.entity.as[MultipartContent] match { case Left(error) ⇒ throw new BucketOperationFailed(s"Failed to parse the server response as multipart content due to: '$error'") case Right(multipartContent) ⇒ - val bodyParts = - if (settings.IgnoreTombstones) - multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) - else - multipartContent.parts + val bodyParts = { + val values = + if (settings.IgnoreTombstones) + multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) + else + multipartContent.parts + + if (settings.EnableHttpCompression) { + values.map(decompressWith(new GzipDecompressor)) + } else { + values + } + } val values = bodyParts.flatMap(part ⇒ toRiakValue(part.entity, vclockHeader ++ part.headers)).toSet From 3e5e0167ccdfae3c89e79a13f6793ff6832bf07c Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 12:30:18 +0100 Subject: [PATCH 06/28] Remove unused imports --- .../scalapenos/riak/internal/RiakHttpClientHelper.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 9118760..a864016 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -18,8 +18,7 @@ package com.scalapenos.riak package internal import akka.actor._ -import spray.http.HttpEncodingRange -import spray.httpx.encoding.{ Decompressor, Encoder, GzipCompressor, GzipDecompressor } +import spray.httpx.encoding.{ Decompressor, GzipDecompressor } private[riak] object RiakHttpClientHelper { import spray.http.HttpEntity @@ -44,11 +43,9 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.http.StatusCodes._ import spray.http.HttpHeaders._ import spray.httpx.SprayJsonSupport._ - import spray.httpx.encoding.{ Gzip, Deflate } + import spray.httpx.encoding.Gzip import spray.json.DefaultJsonProtocol._ - import org.slf4j.LoggerFactory - import SprayClientExtras._ import RiakHttpHeaders._ import RiakHttpClientHelper._ From 72d90f8a0b6b7683852fde90bf8ddaa427c8104d Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 13:31:27 +0100 Subject: [PATCH 07/28] =?UTF-8?q?Disable=20request=20payload=20compression?= =?UTF-8?q?=20for=20=E2=80=9Cset=20bucket=20properties=E2=80=9D=20requests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For some reason, Riak PUT bucket properties endpoint doesn’t handle gzipped requests properly. Given that set bucket properties requests are usually rare (compared to get/store requests) and their payload is reasonably small, it should be good enough to disable compression for them. --- .../riak/internal/RiakHttpClientHelper.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index a864016..2be7848 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -60,7 +60,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup // ========================================================================== def ping(server: RiakServerInfo): Future[Boolean] = { - httpRequest(Get(PingUri(server))).map { response ⇒ + httpRequest()(Get(PingUri(server))).map { response ⇒ response.status match { case OK ⇒ true case other ⇒ throw new OperationFailed(s"Ping on server '$server' produced an unexpected response code '$other'.") @@ -69,7 +69,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver): Future[Option[RiakValue]] = { - httpRequest(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ + httpRequest()(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ response.status match { case OK ⇒ successful(toRiakValue(response)) case NotFound ⇒ successful(None) @@ -81,7 +81,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetchWithSiblings(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver): Future[Option[Set[RiakValue]]] = { - httpRequest(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ + httpRequest()(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ response.status match { case OK ⇒ successful(toRiakValue(response).map(Set(_))) case NotFound ⇒ successful(None) @@ -92,7 +92,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, index: RiakIndex, resolver: RiakConflictsResolver): Future[List[RiakValue]] = { - httpRequest(Get(IndexUri(server, bucket, index))).flatMap { response ⇒ + httpRequest()(Get(IndexUri(server, bucket, index))).flatMap { response ⇒ response.status match { case OK ⇒ fetchWithKeysReturnedByIndexLookup(server, bucket, response, resolver) case BadRequest ⇒ throw new ParametersInvalid(s"""Invalid index name ("${index.fullName}") or value ("${index.value}").""") @@ -102,7 +102,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, indexRange: RiakIndexRange, resolver: RiakConflictsResolver): Future[List[RiakValue]] = { - httpRequest(Get(IndexRangeUri(server, bucket, indexRange))).flatMap { response ⇒ + httpRequest()(Get(IndexRangeUri(server, bucket, indexRange))).flatMap { response ⇒ response.status match { case OK ⇒ fetchWithKeysReturnedByIndexLookup(server, bucket, response, resolver) case BadRequest ⇒ throw new ParametersInvalid(s"""Invalid index name ("${indexRange.fullName}") or range ("${indexRange.start}" to "${indexRange.start}").""") @@ -137,7 +137,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def delete(server: RiakServerInfo, bucket: String, key: String): Future[Unit] = { - httpRequest(Delete(KeyUri(server, bucket, key))).map { response ⇒ + httpRequest()(Delete(KeyUri(server, bucket, key))).map { response ⇒ response.status match { case NoContent ⇒ () case NotFound ⇒ () @@ -149,7 +149,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup def getBucketProperties(server: RiakServerInfo, bucket: String): Future[RiakBucketProperties] = { import spray.httpx.unmarshalling._ - httpRequest(Get(PropertiesUri(server, bucket))).map { response ⇒ + httpRequest()(Get(PropertiesUri(server, bucket))).map { response ⇒ response.status match { case OK ⇒ response.entity.as[RiakBucketProperties] match { case Right(properties) ⇒ properties @@ -165,7 +165,9 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup val entity = JsObject("props" -> JsObject(newProperties.map(property ⇒ (property.name -> property.json)).toMap)) - httpRequest(Put(PropertiesUri(server, bucket), entity)).map { response ⇒ + // For some reason, Riak set bucket props HTTP endpoint doesn't handle compressed request properly. + // So we disable compression for this request unconditionally. + httpRequest(enableCompression = false)(Put(PropertiesUri(server, bucket), entity)).map { response ⇒ response.status match { case NoContent ⇒ () case BadRequest ⇒ throw new ParametersInvalid(s"Setting properties of bucket '$bucket' failed because the http request contained invalid data.") @@ -182,7 +184,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup def allKeys(server: RiakServerInfo, bucket: String): Future[RiakKeys] = { import spray.httpx.unmarshalling._ - httpRequest(Get(AllKeysUri(server, bucket))).map { response ⇒ + httpRequest()(Get(AllKeysUri(server, bucket))).map { response ⇒ response.status match { case OK ⇒ response.entity.as[RiakKeys] match { case Right(riakKeys) ⇒ riakKeys @@ -200,8 +202,8 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None - private def basePipeline = { - if (settings.EnableHttpCompression) { + private def basePipeline(enableCompression: Boolean) = { + if (enableCompression) { addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> @@ -211,8 +213,10 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } } - private def httpRequest = { - addOptionalHeader(clientIdHeader) ~> addHeader("Accept", "*/*, multipart/mixed") ~> basePipeline + private def httpRequest(enableCompression: Boolean = settings.EnableHttpCompression) = { + addOptionalHeader(clientIdHeader) ~> + addHeader("Accept", "*/*, multipart/mixed") ~> + basePipeline(enableCompression) } private def createStoreHttpRequest(value: RiakValue) = { @@ -221,7 +225,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup addOptionalHeader(vclockHeader) ~> addHeaders(indexHeaders) ~> - httpRequest + httpRequest() } // ========================================================================== From 8af47d95f5df8fcfb0a70cb606f7eb9b5fd1bc35 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 18 Jan 2017 14:15:41 +0100 Subject: [PATCH 08/28] Lift the multipart messages decompression onto the unmarshaller layer --- .../riak/internal/RiakHttpClientHelper.scala | 78 ++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 2be7848..cb14918 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -18,7 +18,6 @@ package com.scalapenos.riak package internal import akka.actor._ -import spray.httpx.encoding.{ Decompressor, GzipDecompressor } private[riak] object RiakHttpClientHelper { import spray.http.HttpEntity @@ -43,6 +42,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.http.StatusCodes._ import spray.http.HttpHeaders._ import spray.httpx.SprayJsonSupport._ + import spray.httpx.encoding.Decoder import spray.httpx.encoding.Gzip import spray.json.DefaultJsonProtocol._ @@ -268,41 +268,23 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.http._ - private def decompressWith(decompressorFactory: ⇒ Decompressor)(bodyPart: BodyPart): BodyPart = { - bodyPart.entity match { - case entity @ HttpEntity.NonEmpty(contentType, data) if bodyPart.headers.find(_.isInstanceOf[`Content-Encoding`]).exists(_.value.contains("gzip")) ⇒ - val data = entity.data - - bodyPart.copy(entity = HttpEntity(contentType, decompressorFactory.decompress(data.toByteArray)), headers = bodyPart.headers.filterNot(_.isInstanceOf[`Content-Encoding`])) - - case _ ⇒ bodyPart - } - } - private def resolveConflict(server: RiakServerInfo, bucket: String, key: String, response: HttpResponse, resolver: RiakConflictsResolver): Future[RiakValue] = { import spray.httpx.unmarshalling._ import FixedMultipartContentUnmarshalling._ - implicit val FixedMultipartContentUnmarshaller = multipartContentUnmarshaller(HttpCharsets.`UTF-8`) + implicit val FixedMultipartContentUnmarshaller = + multipartContentUnmarshaller(HttpCharsets.`UTF-8`, if (settings.EnableHttpCompression) Some(Gzip) else None) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList response.entity.as[MultipartContent] match { case Left(error) ⇒ throw new ConflictResolutionFailed(error.toString) case Right(multipartContent) ⇒ - val bodyParts = { - val values = - if (settings.IgnoreTombstones) - multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) - else - multipartContent.parts - - if (settings.EnableHttpCompression) { - values.map(decompressWith(new GzipDecompressor)) - } else { - values - } - } + val bodyParts = + if (settings.IgnoreTombstones) + multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) + else + multipartContent.parts val values = bodyParts.flatMap(part ⇒ toRiakValue(part.entity, vclockHeader ++ part.headers)).toSet @@ -321,26 +303,19 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.httpx.unmarshalling._ import FixedMultipartContentUnmarshalling._ - implicit val FixedMultipartContentUnmarshaller = multipartContentUnmarshaller(HttpCharsets.`UTF-8`) + implicit val FixedMultipartContentUnmarshaller = + multipartContentUnmarshaller(HttpCharsets.`UTF-8`, if (settings.EnableHttpCompression) Some(Gzip) else None) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList response.entity.as[MultipartContent] match { case Left(error) ⇒ throw new BucketOperationFailed(s"Failed to parse the server response as multipart content due to: '$error'") case Right(multipartContent) ⇒ - val bodyParts = { - val values = - if (settings.IgnoreTombstones) - multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) - else - multipartContent.parts - - if (settings.EnableHttpCompression) { - values.map(decompressWith(new GzipDecompressor)) - } else { - values - } - } + val bodyParts = + if (settings.IgnoreTombstones) + multipartContent.parts.filterNot(part ⇒ part.headers.exists(_.lowercaseName == `X-Riak-Deleted`.toLowerCase)) + else + multipartContent.parts val values = bodyParts.flatMap(part ⇒ toRiakValue(part.entity, vclockHeader ++ part.headers)).toSet @@ -361,17 +336,19 @@ private[internal] object FixedMultipartContentUnmarshalling { import org.parboiled.common.FileUtils import scala.collection.JavaConverters._ import spray.http.parser.HttpParser + import spray.httpx.encoding.Decoder import spray.httpx.unmarshalling._ import spray.util._ import spray.http._ import MediaTypes._ import HttpHeaders._ - def multipartContentUnmarshaller(defaultCharset: HttpCharset): Unmarshaller[MultipartContent] = - multipartPartsUnmarshaller[MultipartContent](`multipart/mixed`, defaultCharset, MultipartContent(_)) + def multipartContentUnmarshaller(defaultCharset: HttpCharset, decoder: Option[Decoder]): Unmarshaller[MultipartContent] = + multipartPartsUnmarshaller[MultipartContent](`multipart/mixed`, defaultCharset, decoder, MultipartContent(_)) private def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultCharset: HttpCharset, + decoder: Option[Decoder], create: Seq[BodyPart] ⇒ T): Unmarshaller[T] = Unmarshaller[T](mediaRange) { case HttpEntity.NonEmpty(contentType, data) ⇒ @@ -380,12 +357,20 @@ private[internal] object FixedMultipartContentUnmarshalling { sys.error("Content-Type with a multipart media type must have a non-empty 'boundary' parameter") case Some(boundary) ⇒ val mimeMsg = new MIMEMessage(new ByteArrayInputStream(data.toByteArray), boundary, mimeParsingConfig) - create(convertMimeMessage(mimeMsg, defaultCharset)) + create(convertMimeMessage(mimeMsg, defaultCharset, decoder)) } case HttpEntity.Empty ⇒ create(Nil) } - private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset): Seq[BodyPart] = { + private def decompressData(headers: List[HttpHeader], decoder: Decoder)(data: Array[Byte]): Array[Byte] = { + if (headers.findByType[`Content-Encoding`].exists(_.encoding == decoder.encoding)) { + decoder.newDecompressor.decompress(data) + } else { + data // pass-through + } + } + + private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset, decoder: Option[Decoder]): Seq[BodyPart] = { mimeMsg.getAttachments.asScala.map { part ⇒ val rawHeaders: List[HttpHeader] = part.getAllHeaders.asScala.map(h ⇒ RawHeader(h.getName, h.getValue))(collection.breakOut) @@ -401,7 +386,10 @@ private[internal] object FixedMultipartContentUnmarshalling { .getOrElse(ContentType(`text/plain`, defaultCharset)) // RFC 2046 section 5.1 val outputStream = new ByteArrayOutputStream FileUtils.copyAll(part.readOnce(), outputStream) - BodyPart(HttpEntity(contentType, outputStream.toByteArray), headers) + + val data = decoder.foldRight(outputStream.toByteArray)(decompressData(headers, _)(_)) + BodyPart(HttpEntity(contentType, data), headers) + case (errors, _) ⇒ sys.error("Multipart part contains %s illegal header(s):\n%s".format(errors.size, errors.mkString("\n"))) } }(collection.breakOut) From 59a8157e9d5e90d95a478c85c2034eae966a8bb0 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Thu, 19 Jan 2017 01:28:01 +0100 Subject: [PATCH 09/28] Remove unused import and helper method --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index cb14918..f8fc292 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -42,7 +42,6 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import spray.http.StatusCodes._ import spray.http.HttpHeaders._ import spray.httpx.SprayJsonSupport._ - import spray.httpx.encoding.Decoder import spray.httpx.encoding.Gzip import spray.json.DefaultJsonProtocol._ @@ -246,7 +245,6 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } private def dateTimeFromLastModified(lm: `Last-Modified`): DateTime = fromSprayDateTime(lm.date) - private def lastModifiedFromDateTime(dateTime: DateTime): `Last-Modified` = `Last-Modified`(toSprayDateTime(dateTime)) // ========================================================================== // Index result fetching From a7a8600cd1b6d4a7af61542f5bb70661bd4a83cc Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Thu, 19 Jan 2017 11:41:33 +0100 Subject: [PATCH 10/28] Disable gzip compression by default to be fully backwards compatible --- src/main/resources/reference.conf | 2 +- .../scala/com/scalapenos/riak/internal/RiakClientSettings.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 101ffdb..a6dd43f 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -24,7 +24,7 @@ riak { ignore-tombstones = yes # Should the client use an compression (e.g. Gzip) when talking to Riak via HTTP. - enable-http-compression = yes + enable-http-compression = no } spray.can.client { diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala index 89c002e..e15f657 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala @@ -47,7 +47,7 @@ private[riak] class RiakClientSettings(config: Config) { * Setting for controlling whether the Riak client should use a compression (e.g. Gzip) * when sending and receiving data via HTTP connection to Riak. * - * This value defaults to true. + * This value defaults to *false*. */ final val EnableHttpCompression: Boolean = config.getBoolean("riak.enable-http-compression") From f3b72217a994c2a1f23ca8782386e8b392302cae Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Sat, 21 Jan 2017 15:30:49 +0100 Subject: [PATCH 11/28] Remove an unused dispatcher --- src/test/scala/com/scalapenos/riak/TestUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/scala/com/scalapenos/riak/TestUtils.scala b/src/test/scala/com/scalapenos/riak/TestUtils.scala index 51f4b78..a2c9f84 100644 --- a/src/test/scala/com/scalapenos/riak/TestUtils.scala +++ b/src/test/scala/com/scalapenos/riak/TestUtils.scala @@ -12,7 +12,6 @@ import org.specs2.time.NoTimeConversions trait AkkaActorSystemSpecification extends Specification with NoTimeConversions { implicit val system = ActorSystem("tests") - implicit val executor = system.dispatcher // manual pimped future stolen^H^Hborrowed from spray.util because a // spray.util._ import causes implicit resolution conflicts with the above implicit actor system From 0c80e6943b6cd4d62f557bec79bfaf33bfb04882 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Sat, 21 Jan 2017 15:34:10 +0100 Subject: [PATCH 12/28] Make it possible to create create an actor system with a custom configuration in Spec tests --- .../riak/BasicInteractionsSpec.scala | 2 +- .../scala/com/scalapenos/riak/TestUtils.scala | 28 +++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala index 35747de..e2ec709 100644 --- a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala @@ -19,7 +19,7 @@ package com.scalapenos.riak class BasicInteractionsSpec extends AkkaActorSystemSpecification { "The riak client" should { "be able to perform a simple get-put-get-delete-get CRUD flow" in { - val connection = RiakClient(system) + val connection = RiakClient(defaultSystem) val bucket = connection.bucket("test-basic-interaction") val fetchBeforeStore = bucket.fetch("foo") diff --git a/src/test/scala/com/scalapenos/riak/TestUtils.scala b/src/test/scala/com/scalapenos/riak/TestUtils.scala index a2c9f84..a843622 100644 --- a/src/test/scala/com/scalapenos/riak/TestUtils.scala +++ b/src/test/scala/com/scalapenos/riak/TestUtils.scala @@ -1,17 +1,22 @@ package com.scalapenos.riak -import org.specs2.mutable._ +import java.util.UUID.randomUUID +import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future +import scala.collection.JavaConversions._ + import akka.actor._ import akka.testkit._ +import com.typesafe.config.{ Config, ConfigFactory } +import org.specs2.mutable._ import org.specs2.execute.{ Failure, FailureException } import org.specs2.specification.{ Fragments, Step } import org.specs2.time.NoTimeConversions trait AkkaActorSystemSpecification extends Specification with NoTimeConversions { - implicit val system = ActorSystem("tests") + implicit val defaultSystem = createActorSystem() // manual pimped future stolen^H^Hborrowed from spray.util because a // spray.util._ import causes implicit resolution conflicts with the above implicit actor system @@ -19,18 +24,31 @@ trait AkkaActorSystemSpecification extends Specification with NoTimeConversions def failTest(msg: String) = throw new FailureException(Failure(msg)) + lazy val actorSystems: ConcurrentHashMap[String, ActorSystem] = new ConcurrentHashMap[String, ActorSystem]() + /* Add a final step to the list of test fragments that shuts down the actor system. */ - override def map(fs: ⇒ Fragments) = super.map(fs).add(Step(system.shutdown)) + override def map(fs: ⇒ Fragments) = + super.map(fs).add(Step(actorSystems.values().foreach(TestKit.shutdownActorSystem(_)))) + + protected def createActorSystem(customConfig: Option[Config] = None): ActorSystem = { + val systemName = s"tests-${randomUUID()}" + val system = customConfig match { + case Some(config) ⇒ ActorSystem(systemName, config) + case None ⇒ ActorSystem(systemName) + } + actorSystems.put(systemName, system) + system + } } trait RiakClientSpecification extends AkkaActorSystemSpecification with Before { var client: RiakClient = _ def before { - client = RiakClient(system) + client = RiakClient(defaultSystem) } - skipAllUnless(RiakClient(system).ping.await) + skipAllUnless(RiakClient(defaultSystem).ping.await) } trait RandomKeySupport { From f647a945938c86da3a8fbc46f3db253268fa201f Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Sat, 21 Jan 2017 15:36:03 +0100 Subject: [PATCH 13/28] Parametrize Bucket tests to verify Riak Client behaviour with and without compression --- .../com/scalapenos/riak/RiakBucketSpec.scala | 121 +++++++++--------- .../scala/com/scalapenos/riak/TestUtils.scala | 15 ++- .../riak/UnsafeBucketOperationsSpec.scala | 2 + 3 files changed, 80 insertions(+), 58 deletions(-) diff --git a/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala b/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala index 2ad3e9c..df55cb2 100644 --- a/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala +++ b/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala @@ -18,87 +18,94 @@ package com.scalapenos.riak class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with RandomBucketSupport { - "A RiakBucket" should { - "not be able to store an empty String value" in { - val bucket = randomBucket - val key = randomKey - - // Riak will reject the request with a 400 because the request will - // not have a body (because Spray doesn't allow empty bodies). - bucket.store(key, "").await must throwA[BucketOperationFailed] - } + Seq(true, false) foreach { enableCompression ⇒ + s"When compression is ${if (enableCompression) "enabled" else "disabled"}" in { - "treat tombstone values as if they don't exist when allow_mult = false" in { - val bucket = randomBucket - val key = randomKey + implicit val customClient = createRiakClient(enableCompression) - bucket.store(key, "value").await - bucket.delete(key).await + "A RiakBucket" should { + "not be able to store an empty String value" in { + val bucket = randomBucket + val key = randomKey - val fetched = bucket.fetch(key).await + // Riak will reject the request with a 400 because the request will + // not have a body (because Spray doesn't allow empty bodies). + bucket.store(key, "").await must throwA[BucketOperationFailed] + } - fetched should beNone - } + "treat tombstone values as if they don't exist when allow_mult = false" in { + val bucket = randomBucket + val key = randomKey - "treat tombstone values as if they don't exist when allow_mult = true" in { - val bucket = randomBucket - val key = randomKey + bucket.store(key, "value").await + bucket.delete(key).await - (bucket.allowSiblings = true).await + val fetched = bucket.fetch(key).await - bucket.store(key, "value").await - bucket.delete(key).await + fetched should beNone + } - val fetched = bucket.fetch(key).await + "treat tombstone values as if they don't exist when allow_mult = true" in { + val bucket = randomBucket + val key = randomKey - fetched should beNone - } + (bucket.allowSiblings = true).await - "fetch all sibling values and return them to the client if they exist for a given Riak entry" in { - val bucket = randomBucket - val key = randomKey + bucket.store(key, "value").await + bucket.delete(key).await - (bucket.allowSiblings = true).await + val fetched = bucket.fetch(key).await - val siblingValues = Set("value1", "value2", "value3") + fetched should beNone + } - for (value ← siblingValues) { - // we store values without VectorClock which causes siblings creation - bucket.store(key, value).await - } + "fetch all sibling values and return them to the client if they exist for a given Riak entry" in { + val bucket = randomBucket + val key = randomKey - val fetched = bucket.fetchWithSiblings(key).await + (bucket.allowSiblings = true).await - fetched should beSome - fetched.get.size should beEqualTo(3) - fetched.get.map(_.data) should beEqualTo(siblingValues) - } + val siblingValues = Set("value1", "value2", "value3") - "return a set containing a single value for given Riak entry if there are no siblings when fetching with siblings mode" in { - val bucket = randomBucket - val key = randomKey + for (value ← siblingValues) { + // we store values without VectorClock which causes siblings creation + bucket.store(key, value).await + } - (bucket.allowSiblings = true).await + val fetched = bucket.fetchWithSiblings(key).await - val expectedValue = "value" - bucket.store(key, expectedValue).await + fetched should beSome + fetched.get.size should beEqualTo(3) + fetched.get.map(_.data) should beEqualTo(siblingValues) + } - val fetched = bucket.fetchWithSiblings(key).await + "return a set containing a single value for given Riak entry if there are no siblings when fetching with siblings mode" in { + val bucket = randomBucket + val key = randomKey - fetched should beSome - fetched.get.size should beEqualTo(1) - fetched.get.map(_.data) should beEqualTo(Set(expectedValue)) - } + (bucket.allowSiblings = true).await + + val expectedValue = "value" + bucket.store(key, expectedValue).await + + val fetched = bucket.fetchWithSiblings(key).await - "return None if entry hasn't been found when fetching with siblings mode" in { - val bucket = randomBucket - val key = randomKey + fetched should beSome + fetched.get.size should beEqualTo(1) + fetched.get.map(_.data) should beEqualTo(Set(expectedValue)) + } - (bucket.allowSiblings = true).await + "return None if entry hasn't been found when fetching with siblings mode" in { + val bucket = randomBucket + val key = randomKey - val fetched = bucket.fetchWithSiblings(key).await + (bucket.allowSiblings = true).await - fetched should beNone + val fetched = bucket.fetchWithSiblings(key).await + + fetched should beNone + } + } } } } diff --git a/src/test/scala/com/scalapenos/riak/TestUtils.scala b/src/test/scala/com/scalapenos/riak/TestUtils.scala index a843622..d9a7512 100644 --- a/src/test/scala/com/scalapenos/riak/TestUtils.scala +++ b/src/test/scala/com/scalapenos/riak/TestUtils.scala @@ -49,6 +49,19 @@ trait RiakClientSpecification extends AkkaActorSystemSpecification with Before { } skipAllUnless(RiakClient(defaultSystem).ping.await) + + protected def createRiakClient(enableCompression: Boolean) = { + val config = ConfigFactory.parseString( + s""" + |{ + | riak { + | enable-http-compression = $enableCompression + | } + |} + """.stripMargin) + + RiakClient(createActorSystem(Some(config))) + } } trait RandomKeySupport { @@ -60,5 +73,5 @@ trait RandomKeySupport { trait RandomBucketSupport { self: RiakClientSpecification with RandomKeySupport ⇒ - def randomBucket = client.bucket("riak-bucket-tests-" + randomKey) + def randomBucket(implicit client: RiakClient) = client.bucket("riak-bucket-tests-" + randomKey) } diff --git a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala index 248c2ea..c900130 100644 --- a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala @@ -17,6 +17,8 @@ package com.scalapenos.riak class UnsafeBucketOperationsSpec extends RiakClientSpecification with RandomKeySupport with RandomBucketSupport { + implicit val defaultClient = client + def randomUnsafeBucketOperations = randomBucket.unsafe "UnsafeBucketOperations" should { From cee5aff3e687677a382cb74493589073e5857941 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 25 Jan 2017 15:22:58 +0100 Subject: [PATCH 14/28] tests: extract RiakClient-related tests parametrisation to a base trait without requiring concrete tests to do anything explicitly --- .../riak/BasicInteractionsSpec.scala | 10 +- .../riak/RiakBucketPropertiesSpec.scala | 16 +-- .../com/scalapenos/riak/RiakBucketSpec.scala | 121 +++++++++--------- .../scala/com/scalapenos/riak/TestUtils.scala | 28 +++- .../riak/UnsafeBucketOperationsSpec.scala | 3 +- 5 files changed, 93 insertions(+), 85 deletions(-) diff --git a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala index e2ec709..20d54d9 100644 --- a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala @@ -16,11 +16,13 @@ package com.scalapenos.riak -class BasicInteractionsSpec extends AkkaActorSystemSpecification { - "The riak client" should { +import java.util.UUID.randomUUID + +class BasicInteractionsSpec extends RiakClientSpecification { + + "The Riak client" should { "be able to perform a simple get-put-get-delete-get CRUD flow" in { - val connection = RiakClient(defaultSystem) - val bucket = connection.bucket("test-basic-interaction") + val bucket = client.bucket(s"test-basic-interaction-$randomUUID") val fetchBeforeStore = bucket.fetch("foo") diff --git a/src/test/scala/com/scalapenos/riak/RiakBucketPropertiesSpec.scala b/src/test/scala/com/scalapenos/riak/RiakBucketPropertiesSpec.scala index c81dd6c..e0e197e 100644 --- a/src/test/scala/com/scalapenos/riak/RiakBucketPropertiesSpec.scala +++ b/src/test/scala/com/scalapenos/riak/RiakBucketPropertiesSpec.scala @@ -16,12 +16,11 @@ package com.scalapenos.riak -class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySupport { - private def randomBucket = client.bucket("riak-bucket-tests-" + randomKey) +class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomBucketSupport with RandomKeySupport { "A RiakBucket" should { "support setting and getting the bucket properties" in { - val bucket = randomBucket + val bucket = randomBucket(client) val oldProperties = bucket.properties.await val newNumberOfReplicas = oldProperties.numberOfReplicas + 1 @@ -49,7 +48,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup } "support setting the bucket properties with an empty set (nothing happens)" in { - val bucket = randomBucket + val bucket = randomBucket(client) val oldProperties = bucket.properties.await bucket.setProperties(Set()).await @@ -60,7 +59,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup } "support directly setting the 'n_val' bucket property" in { - val bucket = randomBucket + val bucket = randomBucket(client) (bucket.numberOfReplicas = 5).await bucket.numberOfReplicas.await must beEqualTo(5) @@ -70,7 +69,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup } "support directly setting the 'allow_mult' bucket property" in { - val bucket = randomBucket + val bucket = randomBucket(client) (bucket.allowSiblings = true).await bucket.allowSiblings.await must beTrue @@ -80,7 +79,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup } "support directly setting the 'last_write_wins' bucket property" in { - val bucket = randomBucket + val bucket = randomBucket(client) (bucket.lastWriteWins = true).await bucket.lastWriteWins.await must beTrue @@ -90,7 +89,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup } "fail when directly setting the 'n_val' bucket property to any integer smaller than 1" in { - val bucket = randomBucket + val bucket = randomBucket(client) (bucket.numberOfReplicas = 0).await must throwA[IllegalArgumentException] (bucket.numberOfReplicas = -1).await must throwA[IllegalArgumentException] @@ -103,5 +102,4 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup NumberOfReplicas(-42) must throwA[IllegalArgumentException] } } - } diff --git a/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala b/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala index df55cb2..fe7b8ac 100644 --- a/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala +++ b/src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala @@ -18,94 +18,87 @@ package com.scalapenos.riak class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with RandomBucketSupport { - Seq(true, false) foreach { enableCompression ⇒ - s"When compression is ${if (enableCompression) "enabled" else "disabled"}" in { - - implicit val customClient = createRiakClient(enableCompression) - - "A RiakBucket" should { - "not be able to store an empty String value" in { - val bucket = randomBucket - val key = randomKey - - // Riak will reject the request with a 400 because the request will - // not have a body (because Spray doesn't allow empty bodies). - bucket.store(key, "").await must throwA[BucketOperationFailed] - } + "A RiakBucket" should { + "not be able to store an empty String value" in { + val bucket = randomBucket(client) + val key = randomKey + + // Riak will reject the request with a 400 because the request will + // not have a body (because Spray doesn't allow empty bodies). + bucket.store(key, "").await must throwA[BucketOperationFailed] + } - "treat tombstone values as if they don't exist when allow_mult = false" in { - val bucket = randomBucket - val key = randomKey + "treat tombstone values as if they don't exist when allow_mult = false" in { + val bucket = randomBucket(client) + val key = randomKey - bucket.store(key, "value").await - bucket.delete(key).await + bucket.store(key, "value").await + bucket.delete(key).await - val fetched = bucket.fetch(key).await + val fetched = bucket.fetch(key).await - fetched should beNone - } + fetched should beNone + } - "treat tombstone values as if they don't exist when allow_mult = true" in { - val bucket = randomBucket - val key = randomKey + "treat tombstone values as if they don't exist when allow_mult = true" in { + val bucket = randomBucket(client) + val key = randomKey - (bucket.allowSiblings = true).await + (bucket.allowSiblings = true).await - bucket.store(key, "value").await - bucket.delete(key).await + bucket.store(key, "value").await + bucket.delete(key).await - val fetched = bucket.fetch(key).await + val fetched = bucket.fetch(key).await - fetched should beNone - } + fetched should beNone + } - "fetch all sibling values and return them to the client if they exist for a given Riak entry" in { - val bucket = randomBucket - val key = randomKey + "fetch all sibling values and return them to the client if they exist for a given Riak entry" in { + val bucket = randomBucket(client) + val key = randomKey - (bucket.allowSiblings = true).await + (bucket.allowSiblings = true).await - val siblingValues = Set("value1", "value2", "value3") + val siblingValues = Set("value1", "value2", "value3") - for (value ← siblingValues) { - // we store values without VectorClock which causes siblings creation - bucket.store(key, value).await - } + for (value ← siblingValues) { + // we store values without VectorClock which causes siblings creation + bucket.store(key, value).await + } - val fetched = bucket.fetchWithSiblings(key).await + val fetched = bucket.fetchWithSiblings(key).await - fetched should beSome - fetched.get.size should beEqualTo(3) - fetched.get.map(_.data) should beEqualTo(siblingValues) - } + fetched should beSome + fetched.get.size should beEqualTo(3) + fetched.get.map(_.data) should beEqualTo(siblingValues) + } - "return a set containing a single value for given Riak entry if there are no siblings when fetching with siblings mode" in { - val bucket = randomBucket - val key = randomKey + "return a set containing a single value for given Riak entry if there are no siblings when fetching with siblings mode" in { + val bucket = randomBucket(client) + val key = randomKey - (bucket.allowSiblings = true).await + (bucket.allowSiblings = true).await - val expectedValue = "value" - bucket.store(key, expectedValue).await + val expectedValue = "value" + bucket.store(key, expectedValue).await - val fetched = bucket.fetchWithSiblings(key).await + val fetched = bucket.fetchWithSiblings(key).await - fetched should beSome - fetched.get.size should beEqualTo(1) - fetched.get.map(_.data) should beEqualTo(Set(expectedValue)) - } + fetched should beSome + fetched.get.size should beEqualTo(1) + fetched.get.map(_.data) should beEqualTo(Set(expectedValue)) + } - "return None if entry hasn't been found when fetching with siblings mode" in { - val bucket = randomBucket - val key = randomKey + "return None if entry hasn't been found when fetching with siblings mode" in { + val bucket = randomBucket(client) + val key = randomKey - (bucket.allowSiblings = true).await + (bucket.allowSiblings = true).await - val fetched = bucket.fetchWithSiblings(key).await + val fetched = bucket.fetchWithSiblings(key).await - fetched should beNone - } - } + fetched should beNone } } } diff --git a/src/test/scala/com/scalapenos/riak/TestUtils.scala b/src/test/scala/com/scalapenos/riak/TestUtils.scala index d9a7512..5055f6a 100644 --- a/src/test/scala/com/scalapenos/riak/TestUtils.scala +++ b/src/test/scala/com/scalapenos/riak/TestUtils.scala @@ -12,7 +12,8 @@ import com.typesafe.config.{ Config, ConfigFactory } import org.specs2.mutable._ import org.specs2.execute.{ Failure, FailureException } -import org.specs2.specification.{ Fragments, Step } +import org.specs2.specification.StandardFragments.{ Backtab, Br } +import org.specs2.specification.{ Fragments, Step, Text } import org.specs2.time.NoTimeConversions trait AkkaActorSystemSpecification extends Specification with NoTimeConversions { @@ -39,9 +40,13 @@ trait AkkaActorSystemSpecification extends Specification with NoTimeConversions actorSystems.put(systemName, system) system } + + protected def decorateWith(fs: ⇒ Fragments)(text: String, block: ⇒ Unit) = { + Seq(Br(), Br(), Text(text), Step(block)) ++: fs.middle :+ Backtab(1) + } } -trait RiakClientSpecification extends AkkaActorSystemSpecification with Before { +abstract class RiakClientSpecification extends AkkaActorSystemSpecification with Before { var client: RiakClient = _ def before { @@ -50,7 +55,19 @@ trait RiakClientSpecification extends AkkaActorSystemSpecification with Before { skipAllUnless(RiakClient(defaultSystem).ping.await) - protected def createRiakClient(enableCompression: Boolean) = { + private def specsWithParametrizedCompression(fs: ⇒ Fragments): Fragments = { + Seq(true, false).map { enableCompression ⇒ + val compressionCaseText = s"When compression is ${if (enableCompression) "enabled" else "disabled"} in" + + fs.copy(middle = decorateWith(fs)(text = compressionCaseText, block = { + client = createRiakClient(enableCompression) + })) + }.reduce(_ ^ _) + } + + override def map(fs: ⇒ Fragments) = super.map(specsWithParametrizedCompression(fs)) + + private def createRiakClient(enableCompression: Boolean) = { val config = ConfigFactory.parseString( s""" |{ @@ -70,8 +87,7 @@ trait RandomKeySupport { def randomKey = randomUUID().toString } -trait RandomBucketSupport { - self: RiakClientSpecification with RandomKeySupport ⇒ +trait RandomBucketSupport { self: RandomKeySupport ⇒ - def randomBucket(implicit client: RiakClient) = client.bucket("riak-bucket-tests-" + randomKey) + def randomBucket(client: RiakClient) = client.bucket("riak-bucket-tests-" + randomKey) } diff --git a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala index c900130..0278194 100644 --- a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala @@ -17,9 +17,8 @@ package com.scalapenos.riak class UnsafeBucketOperationsSpec extends RiakClientSpecification with RandomKeySupport with RandomBucketSupport { - implicit val defaultClient = client - def randomUnsafeBucketOperations = randomBucket.unsafe + def randomUnsafeBucketOperations = randomBucket(client).unsafe "UnsafeBucketOperations" should { "list all keys" in { From 7877c713491d8b26fb323b7379f562802c4a73e5 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 25 Jan 2017 15:23:10 +0100 Subject: [PATCH 15/28] tests: remove redundant whitespaces --- src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala | 1 - src/test/scala/com/scalapenos/riak/ConflictResolutionSpec.scala | 1 - src/test/scala/com/scalapenos/riak/RiakClientSpec.scala | 1 - .../scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala | 1 - 4 files changed, 4 deletions(-) diff --git a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala index 20d54d9..46ab104 100644 --- a/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala @@ -44,5 +44,4 @@ class BasicInteractionsSpec extends RiakClientSpecification { fetchAfterDelete must beNone } } - } diff --git a/src/test/scala/com/scalapenos/riak/ConflictResolutionSpec.scala b/src/test/scala/com/scalapenos/riak/ConflictResolutionSpec.scala index 13d6d97..629bf21 100644 --- a/src/test/scala/com/scalapenos/riak/ConflictResolutionSpec.scala +++ b/src/test/scala/com/scalapenos/riak/ConflictResolutionSpec.scala @@ -152,5 +152,4 @@ class ConflictResolutionSpec extends RiakClientSpecification with RandomKeySuppo bucket.fetch(key).await must throwA[ConflicResolutionNotImplemented] } } - } diff --git a/src/test/scala/com/scalapenos/riak/RiakClientSpec.scala b/src/test/scala/com/scalapenos/riak/RiakClientSpec.scala index ab86627..c46c69b 100644 --- a/src/test/scala/com/scalapenos/riak/RiakClientSpec.scala +++ b/src/test/scala/com/scalapenos/riak/RiakClientSpec.scala @@ -23,5 +23,4 @@ class RiakClientSpec extends RiakClientSpecification { client.ping.await should beTrue } } - } diff --git a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala index 0278194..0c173b4 100644 --- a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala @@ -47,5 +47,4 @@ class UnsafeBucketOperationsSpec extends RiakClientSpecification with RandomKeyS allKeys.keys should be(Nil) } } - } From 6cf73cfadbeef3c3953851b10435cb4c51970288 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 25 Jan 2017 15:23:27 +0100 Subject: [PATCH 16/28] tests: fix IDE inspection(s) --- .../com/scalapenos/riak/UnsafeBucketOperationsSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala index 0c173b4..163f016 100644 --- a/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala +++ b/src/test/scala/com/scalapenos/riak/UnsafeBucketOperationsSpec.scala @@ -26,13 +26,13 @@ class UnsafeBucketOperationsSpec extends RiakClientSpecification with RandomKeyS val numberOfKeys = 5 val keys = (1 to numberOfKeys).map(_ ⇒ randomKey) - keys.map { key ⇒ + keys.foreach { key ⇒ unsafeBucketOperations.store(key, "value").await } val allKeys = unsafeBucketOperations.allKeys().await - keys.map { key ⇒ + keys.foreach { key ⇒ unsafeBucketOperations.delete(key).await } From cc0e2d14c5fa4f9a1f38aae0a2658640da061cac Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Thu, 26 Jan 2017 12:08:37 +0100 Subject: [PATCH 17/28] tests: add another set of tests for verifying compression behaviour when using a mixture of compression-enabled and compression-disabled clients Yes, surprisingly these tests may fail with Riak o_O --- .../com/scalapenos/riak/RiakGzipSpec.scala | 113 ++++++++++++++++++ .../scala/com/scalapenos/riak/TestUtils.scala | 26 ++-- 2 files changed, 126 insertions(+), 13 deletions(-) create mode 100644 src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala diff --git a/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala b/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala new file mode 100644 index 0000000..9ddfd4a --- /dev/null +++ b/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2012-2013 Age Mooij (http://scalapenos.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.scalapenos.riak + +import java.util.UUID.randomUUID + +class RiakGzipSpec extends AkkaActorSystemSpecification { + + // ============================================================================ + // Test data + // ============================================================================ + + val key = "foo" + val expectedValue = "bar" + + // ============================================================================ + // Specifications + // ============================================================================ + + "Riak client with an optional compression support" should { + "be able to have 2 clients (with & without compression) working with the same bucket where allowed_siblings=true" in { + + val compressionDisabledClient = createRiakClient(false) + val compressionEnabledClient = createRiakClient(true) + + val bucketName = s"$baseBucketName-$randomUUID" + + val bucket = compressionEnabledClient.bucket(bucketName, fixedConflictResolver) + + // Enable siblings for this bucket + bucket.setAllowSiblings(true).await + + // Check there is no initial data + val fetchBeforeStore = bucket.fetch(key) + fetchBeforeStore.await must beNone + + // This creates 2 siblings in the Riak bucket + bucket.storeAndFetch(key, "initial_bar").await + // but the expected value should win as we use a "fixed" conflict resolver + bucket.storeAndFetch(key, expectedValue).await + + // Try to fetch it with both clients (with and without compression) + checkFetch(compressionEnabledClient, bucketName, key, expectedValue, Some(fixedConflictResolver)) + checkFetch(compressionDisabledClient, bucketName, key, expectedValue, Some(fixedConflictResolver)) + } + + "be able to have 2 clients (with & without compression) working with the same bucket where allowed_siblings=false" in { + + val gzipDisabledClient = createRiakClient(false) + val gzipEnabledClient = createRiakClient(true) + + val bucketName = s"$baseBucketName-$randomUUID" + + val bucket = gzipEnabledClient.bucket(bucketName) + + // Disable siblings for this bucket + bucket.setAllowSiblings(false).await + + // Check there is no initial data + val fetchBeforeStore = bucket.fetch(key) + fetchBeforeStore.await must beNone + + // Put the expected value in Riak + bucket.storeAndFetch(key, expectedValue).await + + // Try to fetch it with both clients (with and without compression) + checkFetch(gzipEnabledClient, bucketName, key, expectedValue) + checkFetch(gzipDisabledClient, bucketName, key, expectedValue) + } + } + + // ============================================================================ + // Helpers + // ============================================================================ + + private def baseBucketName = "test-client-compression-support" + + private val fixedConflictResolver = new RiakConflictsResolver { + // Resolves conflicts by always preferring the expected value + override def resolve(values: Set[RiakValue]): ConflictResolution = + ConflictResolution( + values.find(_.data == expectedValue).getOrElse(throw new Exception("No expected value in siblings")), + writeBack = false) + } + + private def checkFetch(client: RiakClient, + bucketName: String, + key: String, + expectedValue: String, + conflictResolver: Option[RiakConflictsResolver] = None) = { + val bucket = + conflictResolver.map(resolver ⇒ client.bucket(bucketName, resolver)).getOrElse(client.bucket(bucketName)) + + val fetchAfterStore = bucket.fetch(key).await + + fetchAfterStore must beSome[RiakValue] + fetchAfterStore.get.data must beEqualTo(expectedValue) + } +} diff --git a/src/test/scala/com/scalapenos/riak/TestUtils.scala b/src/test/scala/com/scalapenos/riak/TestUtils.scala index 5055f6a..1e40531 100644 --- a/src/test/scala/com/scalapenos/riak/TestUtils.scala +++ b/src/test/scala/com/scalapenos/riak/TestUtils.scala @@ -44,6 +44,19 @@ trait AkkaActorSystemSpecification extends Specification with NoTimeConversions protected def decorateWith(fs: ⇒ Fragments)(text: String, block: ⇒ Unit) = { Seq(Br(), Br(), Text(text), Step(block)) ++: fs.middle :+ Backtab(1) } + + protected def createRiakClient(enableCompression: Boolean) = { + val config = ConfigFactory.parseString( + s""" + |{ + | riak { + | enable-http-compression = $enableCompression + | } + |} + """.stripMargin) + + RiakClient(createActorSystem(Some(config))) + } } abstract class RiakClientSpecification extends AkkaActorSystemSpecification with Before { @@ -66,19 +79,6 @@ abstract class RiakClientSpecification extends AkkaActorSystemSpecification with } override def map(fs: ⇒ Fragments) = super.map(specsWithParametrizedCompression(fs)) - - private def createRiakClient(enableCompression: Boolean) = { - val config = ConfigFactory.parseString( - s""" - |{ - | riak { - | enable-http-compression = $enableCompression - | } - |} - """.stripMargin) - - RiakClient(createActorSystem(Some(config))) - } } trait RandomKeySupport { From 72cc8884cf945c2222e2410e3462338f6c2e8feb Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Thu, 26 Jan 2017 12:31:20 +0100 Subject: [PATCH 18/28] compression: add a workaround for cases when Riak responds with unexpected gzip responses This happens when a value was stored with `Content-Encoding: gzip` and then it is fetched without requiring gzip (no `Accept-Encoding` header or with a `Accept-Encoding: identity` header). This is a reaaaally strange behaviour. --- .../riak/internal/RiakHttpClientHelper.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index f8fc292..6e88d03 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -203,12 +203,13 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private def basePipeline(enableCompression: Boolean) = { if (enableCompression) { - addHeader(`Accept-Encoding`(Gzip.encoding)) ~> - encode(Gzip) ~> - sendReceive ~> - decode(Gzip) + addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> decode(Gzip) } else { - sendReceive + // So one might argue why would you need even to decode if you haven't asked for a gzip response via `Accept-Encoding` header? (the enableCompression=false case). + // Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O + // Yes, it's that weird... + // And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable' + sendReceive ~> decode(Gzip) } } @@ -271,7 +272,8 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import FixedMultipartContentUnmarshalling._ implicit val FixedMultipartContentUnmarshaller = - multipartContentUnmarshaller(HttpCharsets.`UTF-8`, if (settings.EnableHttpCompression) Some(Gzip) else None) + // we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O + multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList @@ -302,7 +304,8 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup import FixedMultipartContentUnmarshalling._ implicit val FixedMultipartContentUnmarshaller = - multipartContentUnmarshaller(HttpCharsets.`UTF-8`, if (settings.EnableHttpCompression) Some(Gzip) else None) + // we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O + multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList @@ -341,12 +344,12 @@ private[internal] object FixedMultipartContentUnmarshalling { import MediaTypes._ import HttpHeaders._ - def multipartContentUnmarshaller(defaultCharset: HttpCharset, decoder: Option[Decoder]): Unmarshaller[MultipartContent] = + def multipartContentUnmarshaller(defaultCharset: HttpCharset, decoder: Decoder): Unmarshaller[MultipartContent] = multipartPartsUnmarshaller[MultipartContent](`multipart/mixed`, defaultCharset, decoder, MultipartContent(_)) private def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultCharset: HttpCharset, - decoder: Option[Decoder], + decoder: Decoder, create: Seq[BodyPart] ⇒ T): Unmarshaller[T] = Unmarshaller[T](mediaRange) { case HttpEntity.NonEmpty(contentType, data) ⇒ @@ -360,7 +363,7 @@ private[internal] object FixedMultipartContentUnmarshalling { case HttpEntity.Empty ⇒ create(Nil) } - private def decompressData(headers: List[HttpHeader], decoder: Decoder)(data: Array[Byte]): Array[Byte] = { + private def decompressData(headers: List[HttpHeader], decoder: Decoder, data: Array[Byte]): Array[Byte] = { if (headers.findByType[`Content-Encoding`].exists(_.encoding == decoder.encoding)) { decoder.newDecompressor.decompress(data) } else { @@ -368,7 +371,7 @@ private[internal] object FixedMultipartContentUnmarshalling { } } - private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset, decoder: Option[Decoder]): Seq[BodyPart] = { + private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset, decoder: Decoder): Seq[BodyPart] = { mimeMsg.getAttachments.asScala.map { part ⇒ val rawHeaders: List[HttpHeader] = part.getAllHeaders.asScala.map(h ⇒ RawHeader(h.getName, h.getValue))(collection.breakOut) @@ -385,7 +388,7 @@ private[internal] object FixedMultipartContentUnmarshalling { val outputStream = new ByteArrayOutputStream FileUtils.copyAll(part.readOnce(), outputStream) - val data = decoder.foldRight(outputStream.toByteArray)(decompressData(headers, _)(_)) + val data = decompressData(headers, decoder, outputStream.toByteArray) BodyPart(HttpEntity(contentType, data), headers) case (errors, _) ⇒ sys.error("Multipart part contains %s illegal header(s):\n%s".format(errors.size, errors.mkString("\n"))) From e2ca24c2230b11aa61d0df9970f8f0a68043c77f Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Fri, 27 Jan 2017 01:36:04 +0100 Subject: [PATCH 19/28] add a link to GitHub issue in comment for a reference --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 6e88d03..c5ca92e 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -166,6 +166,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup // For some reason, Riak set bucket props HTTP endpoint doesn't handle compressed request properly. // So we disable compression for this request unconditionally. + // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/41 httpRequest(enableCompression = false)(Put(PropertiesUri(server, bucket), entity)).map { response ⇒ response.status match { case NoContent ⇒ () From dc4053f333e1d66edfdb08690c70af5ae77f18e9 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Fri, 27 Jan 2017 02:02:42 +0100 Subject: [PATCH 20/28] add a link to GitHub issue in comment for a reference --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index c5ca92e..68497ff 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -210,6 +210,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup // Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O // Yes, it's that weird... // And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable' + // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 sendReceive ~> decode(Gzip) } } @@ -274,6 +275,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup implicit val FixedMultipartContentUnmarshaller = // we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O + // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList @@ -306,6 +308,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup implicit val FixedMultipartContentUnmarshaller = // we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O + // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip) val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList From 5a534964f51949e52bd8ff89fb57fe451d67b869 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 30 Jan 2017 15:32:43 +0100 Subject: [PATCH 21/28] tests: add a test case for compression-enabled clients behaviour when handling concurrent delete operations on single values --- .../com/scalapenos/riak/RiakGzipSpec.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala b/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala index 9ddfd4a..0a88d42 100644 --- a/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala +++ b/src/test/scala/com/scalapenos/riak/RiakGzipSpec.scala @@ -17,6 +17,9 @@ package com.scalapenos.riak import java.util.UUID.randomUUID +import java.util.zip.ZipException + +import scala.concurrent.Future class RiakGzipSpec extends AkkaActorSystemSpecification { @@ -81,6 +84,51 @@ class RiakGzipSpec extends AkkaActorSystemSpecification { checkFetch(gzipEnabledClient, bucketName, key, expectedValue) checkFetch(gzipDisabledClient, bucketName, key, expectedValue) } + + "be able to have 2 clients (with compression) trying to delete same values from the same bucket" in { + + /* + * This test scenario covers an interesting Riak behaviour when returning 'Not Found 404' responses to 'delete key' requests. + * + * Sometimes, Riak can return a 404 response with a text/plain body - 'not found'. However, this response MIGHT have a "Content-Encoding: gzip" header. + * Trying to decompress the payload of such response leads to an internal "java.util.zip.ZipException: Not in GZIP format" exception in the riak client's http pipeline. + */ + + val BatchSize = 1000 + val BucketName = s"$baseBucketName-$randomUUID" + + val client1 = createRiakClient(true) + val client2 = createRiakClient(true) + + val bucket1 = client1.bucket(BucketName) + val bucket2 = client2.bucket(BucketName) + + def createKey(i: Int) = s"foo-$i" + + // First store initial values + val storeFutures = + for { + i ← 1 to BatchSize + } yield bucket1.storeAndFetch(createKey(i), "bar") + + // Wait till all store operations succeed. + Future.sequence(storeFutures).await + + // Now try to delete those keys twice in parallel. + val deleteFutures1 = + for { + i ← 1 to BatchSize + } yield bucket1.delete(createKey(i)) + + val deleteFutures2 = + for { + i ← 1 to BatchSize + } yield bucket2.delete(createKey(i)) + + // Wait for both deletion batches to succeed. + Future.sequence(deleteFutures1).await must not(throwAn[ZipException]) + Future.sequence(deleteFutures2).await must not(throwAn[ZipException]) + } } // ============================================================================ From 7c678424ec9a8c4811d494ddc0c929a080d21770 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 30 Jan 2017 15:51:25 +0100 Subject: [PATCH 22/28] Fix for the wrong 404 not found Riak responses behaviour ( fixes #43 ) --- .../riak/internal/RiakHttpClientHelper.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 68497ff..55a8c11 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -17,8 +17,12 @@ package com.scalapenos.riak package internal +import java.util.zip.ZipException + import akka.actor._ +import scala.util.Try + private[riak] object RiakHttpClientHelper { import spray.http.HttpEntity import spray.httpx.marshalling._ @@ -202,16 +206,23 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None + private def safeDecode: ResponseTransformer = { response ⇒ + Try(decode(Gzip).apply(response)).recover { + // recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but it's payload is not gzipped. + case e: ZipException ⇒ response + }.get + } + private def basePipeline(enableCompression: Boolean) = { if (enableCompression) { - addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> decode(Gzip) + addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> safeDecode } else { // So one might argue why would you need even to decode if you haven't asked for a gzip response via `Accept-Encoding` header? (the enableCompression=false case). // Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O // Yes, it's that weird... // And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable' // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 - sendReceive ~> decode(Gzip) + sendReceive ~> safeDecode } } From 686aa4fecec85785de4e81f3a44ffab626e98bfb Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 31 Jan 2017 00:27:20 +0100 Subject: [PATCH 23/28] Remove encoding requests payload from an http request pipeline Due to a number of Riak shortcomings in regards to handling gzipped requests, it seems safer to simply disable requests compression altogether for now. --- src/main/resources/reference.conf | 3 ++ .../riak/internal/RiakHttpClientHelper.scala | 42 ++++++++----------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index a6dd43f..b83fe48 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -24,6 +24,9 @@ riak { ignore-tombstones = yes # Should the client use an compression (e.g. Gzip) when talking to Riak via HTTP. + # *Note* that only enables compressed *responses* from Riak. + # Requests are sent 'as is', without any compression. + # This is done due to a number of known problems on Riak side in regards to handling those. enable-http-compression = no } diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index 55a8c11..bf1642f 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -63,7 +63,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup // ========================================================================== def ping(server: RiakServerInfo): Future[Boolean] = { - httpRequest()(Get(PingUri(server))).map { response ⇒ + httpRequest(Get(PingUri(server))).map { response ⇒ response.status match { case OK ⇒ true case other ⇒ throw new OperationFailed(s"Ping on server '$server' produced an unexpected response code '$other'.") @@ -72,7 +72,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver): Future[Option[RiakValue]] = { - httpRequest()(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ + httpRequest(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ response.status match { case OK ⇒ successful(toRiakValue(response)) case NotFound ⇒ successful(None) @@ -84,7 +84,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetchWithSiblings(server: RiakServerInfo, bucket: String, key: String, resolver: RiakConflictsResolver): Future[Option[Set[RiakValue]]] = { - httpRequest()(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ + httpRequest(Get(KeyUri(server, bucket, key))).flatMap { response ⇒ response.status match { case OK ⇒ successful(toRiakValue(response).map(Set(_))) case NotFound ⇒ successful(None) @@ -95,7 +95,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, index: RiakIndex, resolver: RiakConflictsResolver): Future[List[RiakValue]] = { - httpRequest()(Get(IndexUri(server, bucket, index))).flatMap { response ⇒ + httpRequest(Get(IndexUri(server, bucket, index))).flatMap { response ⇒ response.status match { case OK ⇒ fetchWithKeysReturnedByIndexLookup(server, bucket, response, resolver) case BadRequest ⇒ throw new ParametersInvalid(s"""Invalid index name ("${index.fullName}") or value ("${index.value}").""") @@ -105,7 +105,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def fetch(server: RiakServerInfo, bucket: String, indexRange: RiakIndexRange, resolver: RiakConflictsResolver): Future[List[RiakValue]] = { - httpRequest()(Get(IndexRangeUri(server, bucket, indexRange))).flatMap { response ⇒ + httpRequest(Get(IndexRangeUri(server, bucket, indexRange))).flatMap { response ⇒ response.status match { case OK ⇒ fetchWithKeysReturnedByIndexLookup(server, bucket, response, resolver) case BadRequest ⇒ throw new ParametersInvalid(s"""Invalid index name ("${indexRange.fullName}") or range ("${indexRange.start}" to "${indexRange.start}").""") @@ -140,7 +140,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup } def delete(server: RiakServerInfo, bucket: String, key: String): Future[Unit] = { - httpRequest()(Delete(KeyUri(server, bucket, key))).map { response ⇒ + httpRequest(Delete(KeyUri(server, bucket, key))).map { response ⇒ response.status match { case NoContent ⇒ () case NotFound ⇒ () @@ -152,7 +152,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup def getBucketProperties(server: RiakServerInfo, bucket: String): Future[RiakBucketProperties] = { import spray.httpx.unmarshalling._ - httpRequest()(Get(PropertiesUri(server, bucket))).map { response ⇒ + httpRequest(Get(PropertiesUri(server, bucket))).map { response ⇒ response.status match { case OK ⇒ response.entity.as[RiakBucketProperties] match { case Right(properties) ⇒ properties @@ -168,10 +168,9 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup val entity = JsObject("props" -> JsObject(newProperties.map(property ⇒ (property.name -> property.json)).toMap)) - // For some reason, Riak set bucket props HTTP endpoint doesn't handle compressed request properly. - // So we disable compression for this request unconditionally. - // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/41 - httpRequest(enableCompression = false)(Put(PropertiesUri(server, bucket), entity)).map { response ⇒ + // *Warning*: for some reason, Riak set bucket props HTTP endpoint doesn't handle compressed request properly. + // Do not try to enable it here. Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/41 + httpRequest(Put(PropertiesUri(server, bucket), entity)).map { response ⇒ response.status match { case NoContent ⇒ () case BadRequest ⇒ throw new ParametersInvalid(s"Setting properties of bucket '$bucket' failed because the http request contained invalid data.") @@ -188,7 +187,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup def allKeys(server: RiakServerInfo, bucket: String): Future[RiakKeys] = { import spray.httpx.unmarshalling._ - httpRequest()(Get(AllKeysUri(server, bucket))).map { response ⇒ + httpRequest(Get(AllKeysUri(server, bucket))).map { response ⇒ response.status match { case OK ⇒ response.entity.as[RiakKeys] match { case Right(riakKeys) ⇒ riakKeys @@ -206,30 +205,25 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None - private def safeDecode: ResponseTransformer = { response ⇒ - Try(decode(Gzip).apply(response)).recover { - // recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but it's payload is not gzipped. - case e: ZipException ⇒ response - }.get - } - private def basePipeline(enableCompression: Boolean) = { if (enableCompression) { - addHeader(`Accept-Encoding`(Gzip.encoding)) ~> encode(Gzip) ~> sendReceive ~> safeDecode + // Note that we don't compress request payload in here (e.g. using `encode(Gzip)` transformer). + // This is due to a number of known shortcomings of Riak in regards to handling gzipped requests. + addHeader(`Accept-Encoding`(Gzip.encoding)) ~> sendReceive ~> decode(Gzip) } else { // So one might argue why would you need even to decode if you haven't asked for a gzip response via `Accept-Encoding` header? (the enableCompression=false case). // Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O // Yes, it's that weird... // And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable' // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 - sendReceive ~> safeDecode + sendReceive ~> decode(Gzip) } } - private def httpRequest(enableCompression: Boolean = settings.EnableHttpCompression) = { + private def httpRequest = { addOptionalHeader(clientIdHeader) ~> addHeader("Accept", "*/*, multipart/mixed") ~> - basePipeline(enableCompression) + basePipeline(settings.EnableHttpCompression) } private def createStoreHttpRequest(value: RiakValue) = { @@ -238,7 +232,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup addOptionalHeader(vclockHeader) ~> addHeaders(indexHeaders) ~> - httpRequest() + httpRequest } // ========================================================================== From f7db713f725633dcd31a5c0232347e2c9d4c3f22 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 31 Jan 2017 00:29:17 +0100 Subject: [PATCH 24/28] Adjust a scaladoc --- .../com/scalapenos/riak/internal/RiakClientSettings.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala index e15f657..67b0d4b 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakClientSettings.scala @@ -45,7 +45,10 @@ private[riak] class RiakClientSettings(config: Config) { /** * Setting for controlling whether the Riak client should use a compression (e.g. Gzip) - * when sending and receiving data via HTTP connection to Riak. + * when *receiving* data via HTTP connection from Riak. + * + * *Note* that this settings does not enable requests payload compression. + * This is done due to a number of known problems on Riak side in regards to handling compressed requests. * * This value defaults to *false*. */ From 8d5d9fe923adcedd3df897019b50f6e997c49349 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 31 Jan 2017 01:08:14 +0100 Subject: [PATCH 25/28] =?UTF-8?q?Revert=20back=20the=20change=20with=20?= =?UTF-8?q?=E2=80=9Csafe=20gzip=20decoding=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Apparently, that rare issue is still reproducible (see builds failures) even with clients that don’t compress requests… :-\ --- .../riak/internal/RiakHttpClientHelper.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index bf1642f..a64e482 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -205,18 +205,29 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup private lazy val clientId = java.util.UUID.randomUUID().toString private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None + /** + * Tries to decode gzipped response payload if response has an appropriate `Content-Encoding` header. + * Returns the payload 'as is' if Gzip decoder throws a [[ZipException]]. + */ + private def safeDecodeGzip: ResponseTransformer = { response ⇒ + Try(decode(Gzip).apply(response)).recover { + // recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but it's payload is not gzipped. + case e: ZipException ⇒ response + }.get + } + private def basePipeline(enableCompression: Boolean) = { if (enableCompression) { // Note that we don't compress request payload in here (e.g. using `encode(Gzip)` transformer). // This is due to a number of known shortcomings of Riak in regards to handling gzipped requests. - addHeader(`Accept-Encoding`(Gzip.encoding)) ~> sendReceive ~> decode(Gzip) + addHeader(`Accept-Encoding`(Gzip.encoding)) ~> sendReceive ~> safeDecodeGzip } else { // So one might argue why would you need even to decode if you haven't asked for a gzip response via `Accept-Encoding` header? (the enableCompression=false case). // Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O // Yes, it's that weird... // And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable' // Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42 - sendReceive ~> decode(Gzip) + sendReceive ~> safeDecodeGzip } } From 1bfb4289e6ef0d5ec502c5e39185ea084d176b0e Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 31 Jan 2017 16:31:11 +0100 Subject: [PATCH 26/28] Fix grammar in a comment --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index a64e482..dbcfbe3 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -211,7 +211,7 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup */ private def safeDecodeGzip: ResponseTransformer = { response ⇒ Try(decode(Gzip).apply(response)).recover { - // recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but it's payload is not gzipped. + // recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but its payload is not gzipped. case e: ZipException ⇒ response }.get } From 6401372ac389912f59f11d62d104919a80e34bfe Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 31 Jan 2017 17:04:15 +0100 Subject: [PATCH 27/28] Add a code comment regarding handling responses with multiple content-encoding values --- .../com/scalapenos/riak/internal/RiakHttpClientHelper.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala index dbcfbe3..453ded2 100644 --- a/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala +++ b/src/main/scala/com/scalapenos/riak/internal/RiakHttpClientHelper.scala @@ -384,6 +384,11 @@ private[internal] object FixedMultipartContentUnmarshalling { } private def decompressData(headers: List[HttpHeader], decoder: Decoder, data: Array[Byte]): Array[Byte] = { + // According to RFC (https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html), + // "If multiple encodings have been applied to an entity, the content codings MUST be listed in the order in which + // they were applied. Additional information about the encoding parameters MAY be provided by other entity-header + // fields not defined by this specification." + // This means that, if there were multiple encodings applied, this will NOT work. if (headers.findByType[`Content-Encoding`].exists(_.encoding == decoder.encoding)) { decoder.newDecompressor.decompress(data) } else { From 59931b7548b2a4fed58e71433d2a2cb24ab37f86 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Tue, 21 Feb 2017 15:24:53 +0100 Subject: [PATCH 28/28] docs: add an additional information about HTTP compression feature to the Readme file --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 86f5f9d..8e92587 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ Other features include: - builtin spray-json (de)serializers - Automatic indexing of Scala (case) classes using type classes - Auto-retry of fetches and stores (a standard feature of the underlying spray-client library) +- Optional compression of Riak HTTP **responses** (see _enable-http-compression_ setting). The following Riak (http) API features are not supported at this time: @@ -88,6 +89,16 @@ The riak-scala-client has been tested against [Riak] versions 1.2.x, 1.3.x, 1.4. And earlier version did manual double URL encoding/decoding but that was not a sustainable solution. Please avoid using characters like ' ', ',', '?', '&', etc. in index names and index values for now. +- HTTP compression, when enabled (_enable-http-compression = true_), is only used for **Riak responses**. + Request payloads are sent to Riak as they are due to a number of known limitations: + - Riak value that has been stored with a non-empty `Content-Encoding` header is always served by Riak + with that encoding, regardless the value of `Accept-Encoding` _fetch object request_ header. + See https://github.com/agemooij/riak-scala-client/issues/42 for details. + - Riak _set bucket properties_ endpoint doesn't handle compressed payloads properly. + See https://github.com/agemooij/riak-scala-client/issues/41 for details. + - Delete requests with `Accept-Encoding: gzip` header may be served with `HTTP 404 Not Found` responses + which have `Content-Encoding: gzip` header, but `text/plain` body. + See https://github.com/agemooij/riak-scala-client/issues/43 for details. ## Why such a boring name?