aboutsummaryrefslogtreecommitdiff
path: root/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
blob: 5f04126de9eb66aed6f6f4c538299deeaab9dc03 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.softwaremill.sttp

import java.nio.ByteBuffer

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
import com.typesafe.scalalogging.StrictLogging
import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}

class StreamingTests
    extends FlatSpec
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures
    with StrictLogging
    with IntegrationPatience
    with TestHttpServer {

  override val serverRoutes: Route =
    path("echo") {
      post {
        parameterMap { params =>
          entity(as[String]) { body: String =>
            complete(body)
          }
        }
      }
    }

  override def port = 51824

  val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
  val monixHandler = MonixAsyncHttpClientHandler()

  akkaStreamingTests()
  monixStreamingTests()

  val body = "streaming test"

  def akkaStreamingTests(): Unit = {
    implicit val handler = akkaHandler

    "Akka HTTP" should "stream request body" in {
      val response = sttp
        .post(uri"$endpoint/echo")
        .streamBody(Source.single(ByteString(body)))
        .send()
        .futureValue

      response.body should be(body)
    }

    it should "receive a stream" in {
      val response = sttp
        .post(uri"$endpoint/echo")
        .body(body)
        .response(asStream[Source[ByteString, Any]])
        .send()
        .futureValue

      val responseBody = response.body.runReduce(_ ++ _).futureValue.utf8String

      responseBody should be(body)
    }
  }

  def monixStreamingTests(): Unit = {
    implicit val handler = monixHandler
    import monix.execution.Scheduler.Implicits.global

    val body = "streaming test"

    "Monix Async Http Client" should "stream request body" in {
      val source = Observable.fromIterable(
        body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))

      val response = sttp
        .post(uri"$endpoint/echo")
        .streamBody(source)
        .send()
        .runAsync
        .futureValue

      response.body should be(body)
    }

    it should "receive a stream" in {
      val response = sttp
        .post(uri"$endpoint/echo")
        .body(body)
        .response(asStream[Observable[ByteBuffer]])
        .send()
        .runAsync
        .futureValue

      val bytes = response.body
        .flatMap(bb => Observable.fromIterable(bb.array()))
        .toListL
        .runAsync
        .futureValue
        .toArray

      new String(bytes, "utf-8") should be(body)
    }

    it should "receive a stream from an https site" in {
      val response = sttp
      // of course, you should never rely on the internet being available
      // in tests, but that's so much easier than setting up an https
      // testing server
        .get(uri"https://softwaremill.com")
        .response(asStream[Observable[ByteBuffer]])
        .send()
        .runAsync
        .futureValue

      val bytes = response.body
        .flatMap(bb => Observable.fromIterable(bb.array()))
        .toListL
        .runAsync
        .futureValue
        .toArray

      new String(bytes, "utf-8") should include("</div>")
    }
  }

  override protected def afterAll(): Unit = {
    akkaHandler.close()
    monixHandler.close()
    super.afterAll()
  }
}