aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-backend/fs2
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-09-14 11:03:21 +0100
committeradamw <adam@warski.org>2017-09-14 11:03:21 +0100
commitfbc71ee712635ed64c50ca694735a84ec794eb11 (patch)
treebf1dd7335306b7f320262d45d0d5b6d02f5a0b27 /async-http-client-backend/fs2
parenta971d409cb1063a2089d936abf3d3ab70bbbabb6 (diff)
downloadsttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.gz
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.bz2
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.zip
Renaming "handler" to "backend"
Diffstat (limited to 'async-http-client-backend/fs2')
-rw-r--r--async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala112
1 files changed, 112 insertions, 0 deletions
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
new file mode 100644
index 0000000..90db69c
--- /dev/null
+++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
@@ -0,0 +1,112 @@
+package com.softwaremill.sttp.asynchttpclient.fs2
+
+import java.nio.ByteBuffer
+
+import cats.effect._
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import com.softwaremill.sttp.{
+ FollowRedirectsBackend,
+ MonadAsyncError,
+ SttpBackend,
+ Utf8,
+ concatByteBuffers
+}
+import fs2._
+import fs2.interop.reactivestreams._
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import scala.language.higherKinds
+
+class AsyncHttpClientFs2Backend[F[_]: Effect] private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit ec: ExecutionContext)
+ extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]](
+ asyncHttpClient,
+ new EffectMonad,
+ closeClient
+ ) {
+
+ override protected def streamBodyToPublisher(
+ s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
+ s.toUnicastPublisher
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
+ p.toStream[F]
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): F[String] = {
+ val bytes = p
+ .toStream[F]
+ .runFold(ByteBuffer.allocate(0))(concatByteBuffers)
+
+ implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8))
+ }
+}
+
+object AsyncHttpClientFs2Backend {
+
+ private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(
+ implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] =
+ new FollowRedirectsBackend(
+ new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient))
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def apply[F[_]: Effect](connectionTimeout: FiniteDuration =
+ SttpBackend.DefaultConnectionTimeout)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingClient[F[_]: Effect](client: AsyncHttpClient)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](client, closeClient = false)
+}
+
+private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
+ extends MonadAsyncError[F] {
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ F.async(register)
+
+ override def unit[T](t: T): F[T] = F.pure(t)
+
+ override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
+
+ override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
+ F.flatMap(fa)(f)
+
+ override def error[T](t: Throwable): F[T] = F.raiseError(t)
+}