Skip to content
This repository has been archived by the owner on Dec 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #40 from ajantis/optional-http-compression
Browse files Browse the repository at this point in the history
Gzip compression
  • Loading branch information
agemooij authored Feb 21, 2017
2 parents 54f742a + 59931b7 commit a09fe96
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 49 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Other features include:
- builtin spray-json (de)serializers
- Automatic indexing of Scala (case) classes using type classes
- Auto-retry of fetches and stores (a standard feature of the underlying spray-client library)
- Optional compression of Riak HTTP **responses** (see _enable-http-compression_ setting).

The following Riak (http) API features are not supported at this time:

Expand Down Expand Up @@ -88,6 +89,16 @@ The riak-scala-client has been tested against [Riak] versions 1.2.x, 1.3.x, 1.4.
And earlier version did manual double URL encoding/decoding but that was not a
sustainable solution. Please avoid using characters like ' ', ',', '?', '&', etc.
in index names and index values for now.
- HTTP compression, when enabled (_enable-http-compression = true_), is only used for **Riak responses**.
Request payloads are sent to Riak as they are due to a number of known limitations:
- Riak value that has been stored with a non-empty `Content-Encoding` header is always served by Riak
with that encoding, regardless the value of `Accept-Encoding` _fetch object request_ header.
See https://github.com/agemooij/riak-scala-client/issues/42 for details.
- Riak _set bucket properties_ endpoint doesn't handle compressed payloads properly.
See https://github.com/agemooij/riak-scala-client/issues/41 for details.
- Delete requests with `Accept-Encoding: gzip` header may be served with `HTTP 404 Not Found` responses
which have `Content-Encoding: gzip` header, but `text/plain` body.
See https://github.com/agemooij/riak-scala-client/issues/43 for details.


## Why such a boring name?
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ riak {
#
# Riak server designates values as tombstones by adding an optional 'X-Riak-Deleted' header.
ignore-tombstones = yes

# Should the client use an compression (e.g. Gzip) when talking to Riak via HTTP.
# *Note* that only enables compressed *responses* from Riak.
# Requests are sent 'as is', without any compression.
# This is done due to a number of known problems on Riak side in regards to handling those.
enable-http-compression = no
}

spray.can.client {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ private[riak] class RiakClientSettings(config: Config) {
*/
final val IgnoreTombstones: Boolean = config.getBoolean("riak.ignore-tombstones")

/**
* Setting for controlling whether the Riak client should use a compression (e.g. Gzip)
* when *receiving* data via HTTP connection from Riak.
*
* *Note* that this settings does not enable requests payload compression.
* This is done due to a number of known problems on Riak side in regards to handling compressed requests.
*
* This value defaults to *false*.
*/
final val EnableHttpCompression: Boolean = config.getBoolean("riak.enable-http-compression")

// TODO: add setting for silently ignoring indexes on backends that don't allow them. The alternative is failing/throwing exceptions

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package com.scalapenos.riak
package internal

import java.util.zip.ZipException

import akka.actor._

import scala.util.Try

private[riak] object RiakHttpClientHelper {
import spray.http.HttpEntity
import spray.httpx.marshalling._
Expand All @@ -42,10 +46,9 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
import spray.http.StatusCodes._
import spray.http.HttpHeaders._
import spray.httpx.SprayJsonSupport._
import spray.httpx.encoding.Gzip
import spray.json.DefaultJsonProtocol._

import org.slf4j.LoggerFactory

import SprayClientExtras._
import RiakHttpHeaders._
import RiakHttpClientHelper._
Expand Down Expand Up @@ -165,6 +168,8 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup

val entity = JsObject("props" -> JsObject(newProperties.map(property (property.name -> property.json)).toMap))

// *Warning*: for some reason, Riak set bucket props HTTP endpoint doesn't handle compressed request properly.
// Do not try to enable it here. Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/41
httpRequest(Put(PropertiesUri(server, bucket), entity)).map { response
response.status match {
case NoContent ()
Expand Down Expand Up @@ -200,10 +205,36 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
private lazy val clientId = java.util.UUID.randomUUID().toString
private val clientIdHeader = if (settings.AddClientIdHeader) Some(RawHeader(`X-Riak-ClientId`, clientId)) else None

/**
* Tries to decode gzipped response payload if response has an appropriate `Content-Encoding` header.
* Returns the payload 'as is' if Gzip decoder throws a [[ZipException]].
*/
private def safeDecodeGzip: ResponseTransformer = { response
Try(decode(Gzip).apply(response)).recover {
// recover from a ZipException: this means that, although the response has a "Content-Encoding: gzip" header, but its payload is not gzipped.
case e: ZipException response
}.get
}

private def basePipeline(enableCompression: Boolean) = {
if (enableCompression) {
// Note that we don't compress request payload in here (e.g. using `encode(Gzip)` transformer).
// This is due to a number of known shortcomings of Riak in regards to handling gzipped requests.
addHeader(`Accept-Encoding`(Gzip.encoding)) ~> sendReceive ~> safeDecodeGzip
} else {
// So one might argue why would you need even to decode if you haven't asked for a gzip response via `Accept-Encoding` header? (the enableCompression=false case).
// Well, there is a surprise from Riak: it will respond with gzip anyway if previous `store value` request was performed with `Content-Encoding: gzip` header! o_O
// Yes, it's that weird...
// And adding `addHeader(`Accept-Encoding`(NoEncoding.encoding))` directive for request will break it: Riak might respond with '406 Not Acceptable'
// Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42
sendReceive ~> safeDecodeGzip
}
}

private def httpRequest = {
addOptionalHeader(clientIdHeader) ~>
addHeader("Accept", "*/*, multipart/mixed") ~>
sendReceive
basePipeline(settings.EnableHttpCompression)
}

private def createStoreHttpRequest(value: RiakValue) = {
Expand Down Expand Up @@ -233,7 +264,6 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
}

private def dateTimeFromLastModified(lm: `Last-Modified`): DateTime = fromSprayDateTime(lm.date)
private def lastModifiedFromDateTime(dateTime: DateTime): `Last-Modified` = `Last-Modified`(toSprayDateTime(dateTime))

// ==========================================================================
// Index result fetching
Expand All @@ -253,12 +283,16 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
// Conflict Resolution
// ==========================================================================

import spray.http._

private def resolveConflict(server: RiakServerInfo, bucket: String, key: String, response: HttpResponse, resolver: RiakConflictsResolver): Future[RiakValue] = {
import spray.http._
import spray.httpx.unmarshalling._
import FixedMultipartContentUnmarshalling._

implicit val FixedMultipartContentUnmarshaller = multipartContentUnmarshaller(HttpCharsets.`UTF-8`)
implicit val FixedMultipartContentUnmarshaller =
// we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O
// Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42
multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip)

val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList

Expand Down Expand Up @@ -288,7 +322,10 @@ private[riak] class RiakHttpClientHelper(system: ActorSystem) extends RiakUriSup
import spray.httpx.unmarshalling._
import FixedMultipartContentUnmarshalling._

implicit val FixedMultipartContentUnmarshaller = multipartContentUnmarshaller(HttpCharsets.`UTF-8`)
implicit val FixedMultipartContentUnmarshaller =
// we always pass a Gzip decoder. Just in case if Riak decides to respond with gzip suddenly. o_O
// Issue for tracking: https://github.com/agemooij/riak-scala-client/issues/42
multipartContentUnmarshaller(HttpCharsets.`UTF-8`, decoder = Gzip)

val vclockHeader = response.headers.find(_.is(`X-Riak-Vclock`.toLowerCase)).toList

Expand Down Expand Up @@ -320,17 +357,19 @@ private[internal] object FixedMultipartContentUnmarshalling {
import org.parboiled.common.FileUtils
import scala.collection.JavaConverters._
import spray.http.parser.HttpParser
import spray.httpx.encoding.Decoder
import spray.httpx.unmarshalling._
import spray.util._
import spray.http._
import MediaTypes._
import HttpHeaders._

def multipartContentUnmarshaller(defaultCharset: HttpCharset): Unmarshaller[MultipartContent] =
multipartPartsUnmarshaller[MultipartContent](`multipart/mixed`, defaultCharset, MultipartContent(_))
def multipartContentUnmarshaller(defaultCharset: HttpCharset, decoder: Decoder): Unmarshaller[MultipartContent] =
multipartPartsUnmarshaller[MultipartContent](`multipart/mixed`, defaultCharset, decoder, MultipartContent(_))

private def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange,
defaultCharset: HttpCharset,
decoder: Decoder,
create: Seq[BodyPart] T): Unmarshaller[T] =
Unmarshaller[T](mediaRange) {
case HttpEntity.NonEmpty(contentType, data)
Expand All @@ -339,12 +378,25 @@ private[internal] object FixedMultipartContentUnmarshalling {
sys.error("Content-Type with a multipart media type must have a non-empty 'boundary' parameter")
case Some(boundary)
val mimeMsg = new MIMEMessage(new ByteArrayInputStream(data.toByteArray), boundary, mimeParsingConfig)
create(convertMimeMessage(mimeMsg, defaultCharset))
create(convertMimeMessage(mimeMsg, defaultCharset, decoder))
}
case HttpEntity.Empty create(Nil)
}

private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset): Seq[BodyPart] = {
private def decompressData(headers: List[HttpHeader], decoder: Decoder, data: Array[Byte]): Array[Byte] = {
// According to RFC (https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html),
// "If multiple encodings have been applied to an entity, the content codings MUST be listed in the order in which
// they were applied. Additional information about the encoding parameters MAY be provided by other entity-header
// fields not defined by this specification."
// This means that, if there were multiple encodings applied, this will NOT work.
if (headers.findByType[`Content-Encoding`].exists(_.encoding == decoder.encoding)) {
decoder.newDecompressor.decompress(data)
} else {
data // pass-through
}
}

private def convertMimeMessage(mimeMsg: MIMEMessage, defaultCharset: HttpCharset, decoder: Decoder): Seq[BodyPart] = {
mimeMsg.getAttachments.asScala.map { part
val rawHeaders: List[HttpHeader] = part.getAllHeaders.asScala.map(h RawHeader(h.getName, h.getValue))(collection.breakOut)

Expand All @@ -360,7 +412,10 @@ private[internal] object FixedMultipartContentUnmarshalling {
.getOrElse(ContentType(`text/plain`, defaultCharset)) // RFC 2046 section 5.1
val outputStream = new ByteArrayOutputStream
FileUtils.copyAll(part.readOnce(), outputStream)
BodyPart(HttpEntity(contentType, outputStream.toByteArray), headers)

val data = decompressData(headers, decoder, outputStream.toByteArray)
BodyPart(HttpEntity(contentType, data), headers)

case (errors, _) sys.error("Multipart part contains %s illegal header(s):\n%s".format(errors.size, errors.mkString("\n")))
}
}(collection.breakOut)
Expand Down
11 changes: 6 additions & 5 deletions src/test/scala/com/scalapenos/riak/BasicInteractionsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.scalapenos.riak

class BasicInteractionsSpec extends AkkaActorSystemSpecification {
"The riak client" should {
import java.util.UUID.randomUUID

class BasicInteractionsSpec extends RiakClientSpecification {

"The Riak client" should {
"be able to perform a simple get-put-get-delete-get CRUD flow" in {
val connection = RiakClient(system)
val bucket = connection.bucket("test-basic-interaction")
val bucket = client.bucket(s"test-basic-interaction-$randomUUID")

val fetchBeforeStore = bucket.fetch("foo")

Expand All @@ -42,5 +44,4 @@ class BasicInteractionsSpec extends AkkaActorSystemSpecification {
fetchAfterDelete must beNone
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,4 @@ class ConflictResolutionSpec extends RiakClientSpecification with RandomKeySuppo
bucket.fetch(key).await must throwA[ConflicResolutionNotImplemented]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.scalapenos.riak

class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySupport {
private def randomBucket = client.bucket("riak-bucket-tests-" + randomKey)
class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomBucketSupport with RandomKeySupport {

"A RiakBucket" should {
"support setting and getting the bucket properties" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val oldProperties = bucket.properties.await

val newNumberOfReplicas = oldProperties.numberOfReplicas + 1
Expand Down Expand Up @@ -49,7 +48,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
}

"support setting the bucket properties with an empty set (nothing happens)" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val oldProperties = bucket.properties.await

bucket.setProperties(Set()).await
Expand All @@ -60,7 +59,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
}

"support directly setting the 'n_val' bucket property" in {
val bucket = randomBucket
val bucket = randomBucket(client)

(bucket.numberOfReplicas = 5).await
bucket.numberOfReplicas.await must beEqualTo(5)
Expand All @@ -70,7 +69,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
}

"support directly setting the 'allow_mult' bucket property" in {
val bucket = randomBucket
val bucket = randomBucket(client)

(bucket.allowSiblings = true).await
bucket.allowSiblings.await must beTrue
Expand All @@ -80,7 +79,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
}

"support directly setting the 'last_write_wins' bucket property" in {
val bucket = randomBucket
val bucket = randomBucket(client)

(bucket.lastWriteWins = true).await
bucket.lastWriteWins.await must beTrue
Expand All @@ -90,7 +89,7 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
}

"fail when directly setting the 'n_val' bucket property to any integer smaller than 1" in {
val bucket = randomBucket
val bucket = randomBucket(client)

(bucket.numberOfReplicas = 0).await must throwA[IllegalArgumentException]
(bucket.numberOfReplicas = -1).await must throwA[IllegalArgumentException]
Expand All @@ -103,5 +102,4 @@ class RiakBucketPropertiesSpec extends RiakClientSpecification with RandomKeySup
NumberOfReplicas(-42) must throwA[IllegalArgumentException]
}
}

}
12 changes: 6 additions & 6 deletions src/test/scala/com/scalapenos/riak/RiakBucketSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with

"A RiakBucket" should {
"not be able to store an empty String value" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

// Riak will reject the request with a 400 because the request will
Expand All @@ -29,7 +29,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with
}

"treat tombstone values as if they don't exist when allow_mult = false" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

bucket.store(key, "value").await
Expand All @@ -41,7 +41,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with
}

"treat tombstone values as if they don't exist when allow_mult = true" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

(bucket.allowSiblings = true).await
Expand All @@ -55,7 +55,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with
}

"fetch all sibling values and return them to the client if they exist for a given Riak entry" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

(bucket.allowSiblings = true).await
Expand All @@ -75,7 +75,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with
}

"return a set containing a single value for given Riak entry if there are no siblings when fetching with siblings mode" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

(bucket.allowSiblings = true).await
Expand All @@ -91,7 +91,7 @@ class RiakBucketSpec extends RiakClientSpecification with RandomKeySupport with
}

"return None if entry hasn't been found when fetching with siblings mode" in {
val bucket = randomBucket
val bucket = randomBucket(client)
val key = randomKey

(bucket.allowSiblings = true).await
Expand Down
1 change: 0 additions & 1 deletion src/test/scala/com/scalapenos/riak/RiakClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ class RiakClientSpec extends RiakClientSpecification {
client.ping.await should beTrue
}
}

}
Loading

0 comments on commit a09fe96

Please sign in to comment.