aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorszimano <szimano@szimano.org>2018-03-19 17:16:15 -0400
committerszimano <szimano@szimano.org>2018-03-19 17:16:15 -0400
commit71fbfbff2aec36b6aace3cf210e15d11e1c0b1db (patch)
tree935131b03658fcddaa6d74eccfda9683429b136d
parent8d296ef9637ad7a7ab6369bffcaa32282edb3401 (diff)
downloadsttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.tar.gz
sttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.tar.bz2
sttp-71fbfbff2aec36b6aace3cf210e15d11e1c0b1db.zip
sync and async calls on hystrix
-rw-r--r--circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala49
-rw-r--r--circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala42
2 files changed, 59 insertions, 32 deletions
diff --git a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
index f63c2db..155ef2f 100644
--- a/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
+++ b/circuit-breaker/hystrix-backend/src/main/scala/com/softwaremill/sttp/hystrix/HystrixBackend.scala
@@ -3,28 +3,25 @@ package com.softwaremill.sttp.hystrix
import com.netflix.hystrix.HystrixObservableCommand.Setter
import com.netflix.hystrix.{HystrixCommand, HystrixCommandGroupKey, HystrixCommandProperties, HystrixObservableCommand}
import com.softwaremill.sttp.{MonadAsyncError, MonadError, Request, Response, SttpBackend}
-import rx.functions.Action
import rx.{Observable, Subscriber}
-import rx.observables.AsyncOnSubscribe
-class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpBackend[R, S] {
+class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S])(groupKey: String, setter: HystrixCommandProperties.Setter)
+ extends SttpBackend[R, S] {
- class SttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
+ class AsyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
extends HystrixObservableCommand[Response[T]](
- Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SttpCommand"))
- .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10))
+ Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
+ .andCommandPropertiesDefaults(setter)
) {
override def construct(): Observable[Response[T]] = Observable.create(
(t: Subscriber[_ >: Response[T]]) => {
val x = responseMonad.flatMap(delegate.send(request)) { response =>
- println(s"GOT RESPONSEEEE!!!! $response")
t.onNext(response)
t.onCompleted()
responseMonad.unit(response)
}
responseMonad.handleError(x){ case e: Throwable =>
- println(s"GOT ERRRORRRR!!! $e")
t.onError(e)
responseMonad.error(e)
}
@@ -32,22 +29,32 @@ class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpB
)
}
+ class SyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
+ extends HystrixCommand[R[Response[T]]](
+ HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
+ .andCommandPropertiesDefaults(setter)
+ ) {
+ override def run(): R[Response[T]] = delegate.send(request)
+ }
+
override def send[T](request: Request[T, S]): R[Response[T]] = {
- responseMonad.asInstanceOf[MonadAsyncError[R]].async { cb =>
+ responseMonad match {
+ case mae: MonadAsyncError[R] => mae.async { cb =>
- new SttpCMD(request, delegate).observe().subscribe(
- new Subscriber[Response[T]]() {
- def onNext(item: Response[T]): Unit = {
- cb(Right(item))
- }
+ new AsyncSttpCMD(request, delegate).observe().subscribe(
+ new Subscriber[Response[T]]() {
+ def onNext(item: Response[T]): Unit = {
+ cb(Right(item))
+ }
- def onError(error: Throwable): Unit = cb(Left(error))
+ def onError(error: Throwable): Unit = cb(Left(error))
+
+ def onCompleted(): Unit = {}
+ })
+ }
- def onCompleted(): Unit = {
- System.out.println("Sequence complete.")
- }
- })
+ case _ => new SyncSttpCMD(request, delegate).execute()
}
}
@@ -61,5 +68,7 @@ class HystrixBackend[R[_], S] private(delegate: SttpBackend[R, S]) extends SttpB
}
object HystrixBackend {
- def apply[R[_], S](delegate: SttpBackend[R, S]) = new HystrixBackend(delegate)
+ def apply[R[_], S](delegate: SttpBackend[R, S])
+ (groupKey: String, setter: HystrixCommandProperties.Setter = HystrixCommandProperties.Setter()) =
+ new HystrixBackend(delegate)(groupKey, setter)
} \ No newline at end of file
diff --git a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
index 9c9b5fb..483e513 100644
--- a/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
+++ b/circuit-breaker/hystrix-backend/src/test/scala/com/softwaremill/sttp/hystrix/HystrixBackendTest.scala
@@ -1,10 +1,9 @@
package com.softwaremill.sttp.prometheus
-import com.netflix.hystrix.{HystrixCommandKey, HystrixCommandMetrics}
-import com.softwaremill.sttp.akkahttp.AkkaHttpBackend
+import com.netflix.hystrix.{HystrixCommandKey, HystrixCommandMetrics, HystrixCommandProperties}
import com.softwaremill.sttp.hystrix.HystrixBackend
import com.softwaremill.sttp.testing.SttpBackendStub
-import com.softwaremill.sttp.{HttpURLConnectionBackend, Id, sttp, _}
+import com.softwaremill.sttp.{sttp, _}
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers, OptionValues}
@@ -13,23 +12,42 @@ import scala.concurrent.Future
class HystrixBackendTest extends FlatSpec with Matchers with BeforeAndAfter with Eventually with OptionValues
with ScalaFutures with IntegrationPatience{
- before {
+ it should "use default hystrix commands on async backend" in {
+ // given
+ val backendStub = SttpBackendStub.asynchronousFuture.whenAnyRequest.thenRespondOk()
+
+ val backend = HystrixBackend[Future, Nothing](backendStub)("TestAsyncCMD", HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10))
+ val requestsNumber = 10
+
+ // when
+ (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://localhost:8080/get")).futureValue)
+
+ // then
+ val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("AsyncSttpCMD"))
+
+ Thread.sleep(100) // wait for the health metrics
+
+ metrics.getHealthCounts.getErrorPercentage shouldBe 0
+ metrics.getHealthCounts.getTotalRequests shouldBe 10
}
- it should "use default hystrix commands" in {
+ it should "use default hystrix commands on sync backend" in {
// given
- val backend = HystrixBackend(AkkaHttpBackend())
+ val backendStub = SttpBackendStub.synchronous.whenAnyRequest.thenRespondOk()
+
+ val backend = HystrixBackend[Id, Nothing](backendStub)("TestSyncCMD", HystrixCommandProperties.Setter().withMetricsHealthSnapshotIntervalInMilliseconds(10))
val requestsNumber = 10
// when
- (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://httpbin.org/get")).futureValue).foreach(println)
+ (0 until requestsNumber).map(_ => backend.send(sttp.get(uri"http://localhost:8080/get")))
// then
- val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("SttpCMD"))
- println(metrics.getExecutionTimeMean)
- println(metrics.getExecutionTimePercentile(10))
- println(metrics.getHealthCounts)
- println(metrics.getCommandGroup)
+ val metrics = HystrixCommandMetrics.getInstance(HystrixCommandKey.Factory.asKey("SyncSttpCMD"))
+
+ Thread.sleep(100) // wait for the health metrics
+
+ metrics.getHealthCounts.getErrorPercentage shouldBe 0
+ metrics.getHealthCounts.getTotalRequests shouldBe 10
}
}