diff options
author | szimano <szimano@szimano.org> | 2018-03-19 17:16:15 -0400 |
---|---|---|
committer | szimano <szimano@szimano.org> | 2018-03-19 17:16:15 -0400 |
commit | 71fbfbff2aec36b6aace3cf210e15d11e1c0b1db (patch) | |
tree | 935131b03658fcddaa6d74eccfda9683429b136d | |
parent | 8d296ef9637ad7a7ab6369bffcaa32282edb3401 (diff) | |
download | sttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.tar.gz sttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.tar.bz2 sttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.zip |
sync and async calls on hystrix
2 files changed, 59 insertions, 32 deletions
diff --git a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala index f63c2db..155ef2f 100644 --- a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala +++ b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala @@ -3,28 +3,25 @@ package com.softwaremill.sttp.hystrix import com.netflix.hystrix.HystrixObservableCommand.Setter import com.netflix.hystrix.{HystrixCommand, HystrixCommandGroupKey, HystrixCommandProperties, HystrixObservableCommand} import com.softwaremill.sttp.{MonadAsyncError, MonadError, Request, Response, SttpBackend} -import rx.functions.Action import rx.{Observable, Subscriber} -import rx.observables.AsyncOnSubscribe -class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpBackend[R, S] { +class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S])(groupKey: String, setter: HystrixCommandProperties.Setter) + extends SttpBackend[R, S] { - class SttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S]) + class AsyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S]) extends HystrixObservableCommand[Response[T]]( - Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SttpCommand")) - .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10)) + Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey)) + .andCommandPropertiesDefaults(setter) ) { override def construct(): Observable[Response[T]] = Observable.create( (t: Subscriber[_ >: Response[T]]) => { val x = responseMonad.flatMap(delegate.send(request)) { response => - println(s"GOT RESPONSEEEE!!!! $response") t.onNext(response) t.onCompleted() responseMonad.unit(response) } responseMonad.handleError(x){ case e: Throwable => - println(s"GOT ERRRORRRR!!! $e") t.onError(e) responseMonad.error(e) } @@ -32,22 +29,32 @@ class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpB ) } + class SyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S]) + extends HystrixCommand[R[Response[T]]]( + HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey)) + .andCommandPropertiesDefaults(setter) + ) { + override def run(): R[Response[T]] = delegate.send(request) + } + override def send[T](request: Request[T, S]): R[Response[T]] = { - responseMonad.asInstanceOf[MonadAsyncError[R]].async { cb => + responseMonad match { + case mae: MonadAsyncError[R] => mae.async { cb => - new SttpCMD(request, delegate).observe().subscribe( - new Subscriber[Response[T]]() { - def onNext(item: Response[T]): Unit = { - cb(Right(item)) - } + new AsyncSttpCMD(request, delegate).observe().subscribe( + new Subscriber[Response[T]]() { + def onNext(item: Response[T]): Unit = { + cb(Right(item)) + } - def onError(error: Throwable): Unit = cb(Left(error)) + def onError(error: Throwable): Unit = cb(Left(error)) + + def onCompleted(): Unit = {} + }) + } - def onCompleted(): Unit = { - System.out.println("Sequence complete.") - } - }) + case _ => new SyncSttpCMD(request, delegate).execute() } } @@ -61,5 +68,7 @@ class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpB } object HystrixBackend { - def apply[R[_], S](delegate: SttpBackend[R, S]) = new HystrixBackend(delegate) + def apply[R[_], S](delegate: SttpBackend[R, S]) + (groupKey: String, setter: HystrixCommandProperties.Setter = HystrixCommandProperties.Setter()) = + new HystrixBackend(delegate)(groupKey, setter) }
\ No newline at end of file diff --git a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala index 9c9b5fb..483e513 100644 --- a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala +++ b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala @@ -1,10 +1,9 @@ package com.softwaremill.sttp.prometheus -import com.netflix.hystrix.{HystrixCommandKey, HystrixCommandMetrics} -import com.softwaremill.sttp.akkahttp.AkkaHttpBackend +import com.netflix.hystrix.{HystrixCommandKey, HystrixCommandMetrics, HystrixCommandProperties} import com.softwaremill.sttp.hystrix.HystrixBackend import com.softwaremill.sttp.testing.SttpBackendStub -import com.softwaremill.sttp.{HttpURLConnectionBackend, Id, sttp, _} +import com.softwaremill.sttp.{sttp, _} import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers, OptionValues} @@ -13,23 +12,42 @@ import scala.concurrent.Future class HystrixBackendTest extends FlatSpec with Matchers with BeforeAndAfter with Eventually with OptionValues with ScalaFutures with IntegrationPatience{ - before { + it should "use default hystrix commands on async backend" in { + // given + val backendStub = SttpBackendStub.asynchronousFuture.whenAnyRequest.thenRespondOk() + + val backend = HystrixBackend[Future, Nothing](backendStub)("TestAsyncCMD", HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10)) + val requestsNumber = 10 + + // when + (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://localhost:8080/get")).futureValue) + + // then + val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("AsyncSttpCMD")) + + Thread.sleep(100) // wait for the health metrics + + metrics.getHealthCounts.getErrorPercentage shouldBe 0 + metrics.getHealthCounts.getTotalRequests shouldBe 10 } - it should "use default hystrix commands" in { + it should "use default hystrix commands on sync backend" in { // given - val backend = HystrixBackend(AkkaHttpBackend()) + val backendStub = SttpBackendStub.synchronous.whenAnyRequest.thenRespondOk() + + val backend = HystrixBackend[Id, Nothing](backendStub)("TestSyncCMD", HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10)) val requestsNumber = 10 // when - (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://httpbin.org/get")).futureValue).foreach(println) + (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://localhost:8080/get"))) // then - val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("SttpCMD")) - println(metrics.getExecutionTimeMean) - println(metrics.getExecutionTimePercentile(10)) - println(metrics.getHealthCounts) - println(metrics.getCommandGroup) + val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("SyncSttpCMD")) + + Thread.sleep(100) // wait for the health metrics + + metrics.getHealthCounts.getErrorPercentage shouldBe 0 + metrics.getHealthCounts.getTotalRequests shouldBe 10 } } |