Skip to content

Commit

Permalink
Merge pull request #270 from tumblr/bb-improve-service-client-error-r…
Browse files Browse the repository at this point in the history
…eporting

Include exception in error hit
  • Loading branch information
DanSimon committed Oct 19, 2015
2 parents 92ee8a6 + 42baa44 commit 9d8a27c
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions colossus/src/main/scala/colossus/service/ServiceClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,7 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config
}

private def purgeBuffers(reason : Throwable) {
sentBuffer.foreach{s =>
errors.hit(tags = hpTags)
s.handler(Failure(new ConnectionLostException(s"Error while request was in transit: $reason")))
}
sentBuffer.foreach(failRequest(_, reason))
sentBuffer.clear()
purgeOutgoing(reason)
if (failFast) {
Expand All @@ -237,15 +234,16 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config
super.connectionLost(cause)
cause match {
case DisconnectCause.ConnectFailed(error) => {
log.warning(s"failed to connect to ${address.toString}: ${error.getMessage}")
log.warning(s"${id.get} failed to connect to ${address.toString}: ${error.getMessage}")
connectionFailures.hit(tags = hpTags)
purgeBuffers(new NotConnectedException(s"${cause.logString}"))
}
case _ => {
log.warning(s"${id.get} connection to ${address.toString} lost: ${cause.logString}")
log.warning(s"${id.get} connection lost to ${address.toString}: ${cause.logString}")
disconnects.hit(tags = hpTags + ("cause" -> cause.tagString))
purgeBuffers(new ConnectionLostException(s"${cause.logString}"))
}
}
purgeBuffers(new NotConnectedException(s"${cause.logString}"))
attemptReconnect()
}

Expand All @@ -270,8 +268,8 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config

private def attemptWrite(s: SourcedRequest) {
if (disconnecting) {
//don't allow any new requests, appear as if we're dead
s.handler(Failure(new NotConnectedException("Not Connected")))
// don't allow any new requests, appear as if we're dead
failRequest(s, new NotConnectedException("Not Connected"))
} else if (isConnected || !failFast) {
val pushed = push(s.message, s.start){
case OutputResult.Success => {
Expand All @@ -280,15 +278,15 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config
pauseWrites() //writes resumed in processMessage
}
}
case OutputResult.Failure(err) => s.handler(Failure(err))
case OutputResult.Cancelled(err) => s.handler(Failure(err))
case OutputResult.Failure(err) => failRequest(s, err)
case OutputResult.Cancelled(err) => failRequest(s, err)
}
if (!pushed) {
s.handler(Failure(new ClientOverloadedException(s"Error sending ${s.message}: Client is overloaded")))
failRequest(s, new ClientOverloadedException(s"Error sending ${s.message}: Client is overloaded"))
}
} else {
droppedRequests.hit(tags = hpTags)
s.handler(Failure(new NotConnectedException("Not Connected")))
failRequest(s, new NotConnectedException("Not Connected"))
}
}

Expand All @@ -308,4 +306,10 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config
worker ! Kill(id.get, DisconnectCause.TimedOut)
}
}

private def failRequest(s: SourcedRequest, exception: Throwable): Unit = {
// TODO clean up duplicate code https://github.com/tumblr/colossus/issues/274
errors.hit(tags = hpTags + ("type" -> exception.getClass.getName.replaceAll("[^\\w]", "")))
s.handler(Failure(exception))
}
}

0 comments on commit 9d8a27c

Please sign in to comment.