blob: 5f04126de9eb66aed6f6f4c538299deeaab9dc03 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package com.softwaremill.sttp
import java.nio.ByteBuffer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
import com.typesafe.scalalogging.StrictLogging
import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
class StreamingTests
extends FlatSpec
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with StrictLogging
with IntegrationPatience
with TestHttpServer {
override val serverRoutes: Route =
path("echo") {
post {
parameterMap { params =>
entity(as[String]) { body: String =>
complete(body)
}
}
}
}
override def port = 51824
val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
val monixHandler = MonixAsyncHttpClientHandler()
akkaStreamingTests()
monixStreamingTests()
val body = "streaming test"
def akkaStreamingTests(): Unit = {
implicit val handler = akkaHandler
"Akka HTTP" should "stream request body" in {
val response = sttp
.post(uri"$endpoint/echo")
.streamBody(Source.single(ByteString(body)))
.send()
.futureValue
response.body should be(body)
}
it should "receive a stream" in {
val response = sttp
.post(uri"$endpoint/echo")
.body(body)
.response(asStream[Source[ByteString, Any]])
.send()
.futureValue
val responseBody = response.body.runReduce(_ ++ _).futureValue.utf8String
responseBody should be(body)
}
}
def monixStreamingTests(): Unit = {
implicit val handler = monixHandler
import monix.execution.Scheduler.Implicits.global
val body = "streaming test"
"Monix Async Http Client" should "stream request body" in {
val source = Observable.fromIterable(
body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
val response = sttp
.post(uri"$endpoint/echo")
.streamBody(source)
.send()
.runAsync
.futureValue
response.body should be(body)
}
it should "receive a stream" in {
val response = sttp
.post(uri"$endpoint/echo")
.body(body)
.response(asStream[Observable[ByteBuffer]])
.send()
.runAsync
.futureValue
val bytes = response.body
.flatMap(bb => Observable.fromIterable(bb.array()))
.toListL
.runAsync
.futureValue
.toArray
new String(bytes, "utf-8") should be(body)
}
it should "receive a stream from an https site" in {
val response = sttp
// of course, you should never rely on the internet being available
// in tests, but that's so much easier than setting up an https
// testing server
.get(uri"https://softwaremill.com")
.response(asStream[Observable[ByteBuffer]])
.send()
.runAsync
.futureValue
val bytes = response.body
.flatMap(bb => Observable.fromIterable(bb.array()))
.toListL
.runAsync
.futureValue
.toArray
new String(bytes, "utf-8") should include("</div>")
}
}
override protected def afterAll(): Unit = {
akkaHandler.close()
monixHandler.close()
super.afterAll()
}
}
|