package com.softwaremill.sttp.hystrix
import com.netflix.hystrix.HystrixObservableCommand.Setter
import com.netflix.hystrix.{HystrixCommand, HystrixCommandGroupKey, HystrixCommandProperties, HystrixObservableCommand}
import com.softwaremill.sttp.{MonadAsyncError, MonadError, Request, Response, SttpBackend}
import rx.{Observable, Subscriber}
class HystrixBackend[R[_], S] private (delegate: SttpBackend[R, S])(groupKey: String,
propertySetter: HystrixCommandProperties.Setter)
extends SttpBackend[R, S] {
import HystrixBackend._
class AsyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
extends HystrixObservableCommand[Response[T]](
request.tag(HystrixAsyncConfiguration).map(_.asInstanceOf[HystrixObservableCommand.Setter]) match {
case None =>
Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandPropertiesDefaults(propertySetter)
case Some(s) => s
}
) {
override def construct(): Observable[Response[T]] = Observable.create(
(t: Subscriber[_ >: Response[T]]) => {
val x = responseMonad.flatMap(delegate.send(request)) { response =>
t.onNext(response)
t.onCompleted()
responseMonad.unit(response)
}
responseMonad.handleError(x) {
case e: Throwable =>
t.onError(e)
responseMonad.error(e)
}
}
)
}
class SyncSttpCMD[T](request: Request[T, S], delegate: SttpBackend[R, S])
extends HystrixCommand[R[Response[T]]](
request.tag(HystrixSyncConfiguration).map(_.asInstanceOf[HystrixCommand.Setter]) match {
case None =>
HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandPropertiesDefaults(propertySetter)
case Some(s) => s
}
) {
private var fallback: R[Response[T]] = responseMonad.unit(null)
override def run(): R[Response[T]] = {
val rr: R[Response[T]] = responseMonad.map(delegate.send(request)) {
case errResponse @ Response(Left(error), _, _, _, _) =>
fallback = responseMonad.unit(errResponse)
throw new RuntimeException(error)
case r: Response[T] => r
}
responseMonad.handleError(rr) {
case t: Throwable =>
fallback = responseMonad.error(t)
throw new RuntimeException(t)
}
}
override def getFallback: R[Response[T]] = fallback
}
override def send[T](request: Request[T, S]): R[Response[T]] = {
responseMonad match {
case mae: MonadAsyncError[R] =>
mae.async { cb =>
new AsyncSttpCMD(request, delegate)
.observe()
.subscribe(new Subscriber[Response[T]]() {
def onNext(item: Response[T]): Unit = {
cb(Right(item))
}
def onError(error: Throwable): Unit = cb(Left(error))
def onCompleted(): Unit = {}
})
}
case _ => new SyncSttpCMD(request, delegate).execute()
}
}
override def close(): Unit = delegate.close()
/**
* The monad in which the responses are wrapped. Allows writing wrapper
* backends, which map/flatMap over the return value of [[send]].
*/
override def responseMonad: MonadError[R] = delegate.responseMonad
}
object HystrixBackend {
def apply[R[_], S](delegate: SttpBackend[R, S])(groupKey: String,
propertySetter: HystrixCommandProperties.Setter =
HystrixCommandProperties.Setter()) =
new HystrixBackend(delegate)(groupKey, propertySetter)
private val HystrixAsyncConfiguration = "HystrixAsyncConfiguration"
private val HystrixSyncConfiguration = "HystrixSyncConfiguration"
implicit class HystrixRichRequest[T, S](request: Request[T, S]) {
def configureAsyncHystrix(setter: HystrixObservableCommand.Setter): Request[T, S] =
request.tag(HystrixAsyncConfiguration, setter)
def configureSyncHystrix(setter: HystrixCommand.Setter): Request[T, S] =
request.tag(HystrixSyncConfiguration, setter)
}
}