diff --git a/colossus/src/main/scala/colossus/service/ServiceClient.scala b/colossus/src/main/scala/colossus/service/ServiceClient.scala index 642c9dc47..cc45351a6 100644 --- a/colossus/src/main/scala/colossus/service/ServiceClient.scala +++ b/colossus/src/main/scala/colossus/service/ServiceClient.scala @@ -215,10 +215,7 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config } private def purgeBuffers(reason : Throwable) { - sentBuffer.foreach{s => - errors.hit(tags = hpTags) - s.handler(Failure(new ConnectionLostException(s"Error while request was in transit: $reason"))) - } + sentBuffer.foreach(failRequest(_, reason)) sentBuffer.clear() purgeOutgoing(reason) if (failFast) { @@ -237,15 +234,16 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config super.connectionLost(cause) cause match { case DisconnectCause.ConnectFailed(error) => { - log.warning(s"failed to connect to ${address.toString}: ${error.getMessage}") + log.warning(s"${id.get} failed to connect to ${address.toString}: ${error.getMessage}") connectionFailures.hit(tags = hpTags) + purgeBuffers(new NotConnectedException(s"${cause.logString}")) } case _ => { - log.warning(s"${id.get} connection to ${address.toString} lost: ${cause.logString}") + log.warning(s"${id.get} connection lost to ${address.toString}: ${cause.logString}") disconnects.hit(tags = hpTags + ("cause" -> cause.tagString)) + purgeBuffers(new ConnectionLostException(s"${cause.logString}")) } } - purgeBuffers(new NotConnectedException(s"${cause.logString}")) attemptReconnect() } @@ -270,8 +268,8 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config private def attemptWrite(s: SourcedRequest) { if (disconnecting) { - //don't allow any new requests, appear as if we're dead - s.handler(Failure(new NotConnectedException("Not Connected"))) + // don't allow any new requests, appear as if we're dead + failRequest(s, new NotConnectedException("Not Connected")) } else if (isConnected || !failFast) { val pushed = push(s.message, s.start){ case OutputResult.Success => { @@ -280,15 +278,15 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config pauseWrites() //writes resumed in processMessage } } - case OutputResult.Failure(err) => s.handler(Failure(err)) - case OutputResult.Cancelled(err) => s.handler(Failure(err)) + case OutputResult.Failure(err) => failRequest(s, err) + case OutputResult.Cancelled(err) => failRequest(s, err) } if (!pushed) { - s.handler(Failure(new ClientOverloadedException(s"Error sending ${s.message}: Client is overloaded"))) + failRequest(s, new ClientOverloadedException(s"Error sending ${s.message}: Client is overloaded")) } } else { droppedRequests.hit(tags = hpTags) - s.handler(Failure(new NotConnectedException("Not Connected"))) + failRequest(s, new NotConnectedException("Not Connected")) } } @@ -308,4 +306,10 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config worker ! Kill(id.get, DisconnectCause.TimedOut) } } + + private def failRequest(s: SourcedRequest, exception: Throwable): Unit = { + // TODO clean up duplicate code https://github.com/tumblr/colossus/issues/274 + errors.hit(tags = hpTags + ("type" -> exception.getClass.getName.replaceAll("[^\\w]", ""))) + s.handler(Failure(exception)) + } }