aboutsummaryrefslogtreecommitdiff
path: root/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala
blob: 1a1ebdc0ff09bdb0805c6949b65454489eac1b29 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package ch.jodersky.flow
package stream

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import org.scalatest._

class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal {

  implicit val system = ActorSystem("flow-test")
  implicit val materializer = ActorMaterializer()

  override def afterAll {
    system.terminate()
  }

  "Serial stream" should {
    val data = ByteString(("hello world").getBytes("utf-8"))

    "receive the same data it sends in an echo test" in {
      withEcho { case (port, settings) =>
        val graph = Source.single(data)
          .via(Serial().open(port, settings)) // send to echo pty
          .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS
          .dropWhile(_ != data)
          .toMat(Sink.head)(Keep.right)

        Await.result(graph.run(), 2.seconds)
      }
    }

    "fail if the underlying pty fails" in {
      val result = withEcho { case (port, settings) =>
        Source.single(data)
          .via(Serial().open(port, settings))
          .toMat(Sink.last)(Keep.right)
          .run()}

      intercept[StreamSerialException] {
        Await.result(result, 10.seconds)
      }
    }

  }

}