aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorszimano <szimano@szimano.org>2018-03-20 23:24:48 -0400
committerszimano <szimano@szimano.org>2018-03-20 23:24:48 -0400
commit27e781e2ee32e1e9d6a321f8406b45beaa25d780 (patch)
tree4a17cdcc0f6c9a47985be43394febcb67329b8c3
parentd2a0ab6be00189f419026ea1d50ebdc375ca39d1 (diff)
downloadsttp-hystrix-another-take.tar.gz
sttp-hystrix-another-take.tar.bz2
sttp-hystrix-another-take.zip
circuit open testhystrix-another-take
-rw-r--r--circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala62
-rw-r--r--circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala40
2 files changed, 91 insertions, 11 deletions
diff --git a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
index dccf4ce..afc20bc 100644
--- a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
+++ b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
@@ -6,14 +6,20 @@ import com.softwaremill.sttp.{MonadAsyncError, MonadError, Request, Response, St
import rx.{Observable, Subscriber}
class HystrixBackend[R[_], S] private (delegate: SttpBackend[R, S])(groupKey: String,
- setter: HystrixCommandProperties.Setter)
+ propertySetter: HystrixCommandProperties.Setter)
extends SttpBackend[R, S] {
+ import HystrixBackend._
+
class AsyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
extends HystrixObservableCommand[Response[T]](
- Setter
- .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
- .andCommandPropertiesDefaults(setter)
+ request.tag(HystrixAsyncConfiguration).map(_.asInstanceOf[HystrixObservableCommand.Setter]) match {
+ case None =>
+ Setter
+ .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
+ .andCommandPropertiesDefaults(propertySetter)
+ case Some(s) => s
+ }
) {
override def construct(): Observable[Response[T]] = Observable.create(
@@ -34,11 +40,34 @@ class HystrixBackend[R[_], S] private (delegate: SttpBackend[R, S])(groupKey: St
class SyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
extends HystrixCommand[R[Response[T]]](
- HystrixCommand.Setter
- .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
- .andCommandPropertiesDefaults(setter)
+ request.tag(HystrixSyncConfiguration).map(_.asInstanceOf[HystrixCommand.Setter]) match {
+ case None =>
+ HystrixCommand.Setter
+ .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
+ .andCommandPropertiesDefaults(propertySetter)
+ case Some(s) => s
+ }
) {
- override def run(): R[Response[T]] = delegate.send(request)
+
+ private var fallback: R[Response[T]] = responseMonad.unit(null)
+
+ override def run(): R[Response[T]] = {
+ val rr: R[Response[T]] = responseMonad.map(delegate.send(request)) {
+ case errResponse @ Response(Left(error), _, _, _, _) =>
+ fallback = responseMonad.unit(errResponse)
+ throw new RuntimeException(error)
+ case r: Response[T] => r
+ }
+
+ responseMonad.handleError(rr) {
+ case t: Throwable =>
+ fallback = responseMonad.error(t)
+ throw new RuntimeException(t)
+ }
+ }
+
+ override def getFallback: R[Response[T]] = fallback
+
}
override def send[T](request: Request[T, S]): R[Response[T]] = {
@@ -74,7 +103,20 @@ class HystrixBackend[R[_], S] private (delegate: SttpBackend[R, S])(groupKey: St
object HystrixBackend {
def apply[R[_], S](delegate: SttpBackend[R, S])(groupKey: String,
- setter: HystrixCommandProperties.Setter =
+ propertySetter: HystrixCommandProperties.Setter =
HystrixCommandProperties.Setter()) =
- new HystrixBackend(delegate)(groupKey, setter)
+ new HystrixBackend(delegate)(groupKey, propertySetter)
+
+ private val HystrixAsyncConfiguration = "HystrixAsyncConfiguration"
+
+ private val HystrixSyncConfiguration = "HystrixSyncConfiguration"
+
+ implicit class HystrixRichRequest[T, S](request: Request[T, S]) {
+ def configureAsyncHystrix(setter: HystrixObservableCommand.Setter): Request[T, S] =
+ request.tag(HystrixAsyncConfiguration, setter)
+
+ def configureSyncHystrix(setter: HystrixCommand.Setter): Request[T, S] =
+ request.tag(HystrixSyncConfiguration, setter)
+ }
+
}
diff --git a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
index 8772d45..f97e7da 100644
--- a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
+++ b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
@@ -1,6 +1,6 @@
package com.softwaremill.sttp.hystrix
-import com.netflix.hystrix.{HystrixCommandKey, HystrixCommandMetrics, HystrixCommandProperties}
+import com.netflix.hystrix._
import com.softwaremill.sttp.testing.SttpBackendStub
import com.softwaremill.sttp.{sttp, _}
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
@@ -59,4 +59,42 @@ class HystrixBackendTest
metrics.getHealthCounts.getTotalRequests shouldBe 10
}
+ it should "open circuit when server is throwing errors" in {
+ // given
+ val backendStub = SttpBackendStub.synchronous.whenAnyRequest.thenRespondServerError()
+
+ val backend = HystrixBackend[Id, Nothing](backendStub)("TestSyncCircuitCMD")
+ val requestsNumber = 1000
+
+ val commandName = "SyncCircuitSttpCMD"
+
+ // when
+ import HystrixBackend._
+
+ (0 until requestsNumber).map(
+ _ =>
+ backend.send(
+ sttp
+ .get(uri"http://localhost:8080/get")
+ .configureSyncHystrix(
+ HystrixCommand.Setter
+ .withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestSyncCircuitCMD"))
+ .andCommandKey(HystrixCommandKey.Factory.asKey(commandName))
+ .andCommandPropertiesDefaults(
+ HystrixCommandProperties
+ .Setter()
+ .withMetricsHealthSnapshotIntervalInMilliseconds(10)
+ .withCircuitBreakerEnabled(true)
+ ))))
+
+ // then
+ val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey(commandName))
+
+ Thread.sleep(100) // wait for the health metrics
+ metrics.getHealthCounts.getErrorPercentage shouldBe 100
+ metrics.getProperties.circuitBreakerEnabled().get() shouldBe true
+
+ HystrixCircuitBreaker.Factory.getInstance(HystrixCommandKey.Factory.asKey(commandName)).isOpen shouldBe true
+ }
+
}