aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/Serial.scala
blob: 785001f59a1f78dd03f05694143258a1dd8bfc21 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package akka.serial
package stream

import akka.stream.scaladsl.Source
import scala.concurrent.Future

import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
import akka.io.IO
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import akka.serial.{Serial => CoreSerial}
import impl._


object Serial extends ExtensionId[Serial] with ExtensionIdProvider {

  /**
    * Represents a prospective serial connection.
    */
  case class Connection(port: String, settings: SerialSettings)

  case class Watch(ports: Set[String])

  def apply()(implicit system: ActorSystem): Serial = super.apply(system)

  override def lookup() = Serial

  override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)

}

/**
  * Entry point to streaming over serial ports.
  * The design of this API is inspired by Akka's Tcp streams.
  */
class Serial(system: ExtendedActorSystem) extends Extension {

  /**
    * Creates a Flow that will open a serial port when materialized.
    * This Flow then represents an open serial connection: data pushed to its
    * inlet will be written to the underlying serial port, and data received
    * on the port will be emitted by its outlet.
    * @param port name of serial port to open
    * @param settings settings to use with serial port
    * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
    * @param bufferSize maximum read and write buffer sizes
    * @return a Flow associated to the given serial port
    */
  def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
      Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
    new SerialConnectionStage(
      IO(CoreSerial)(system),
      port,
      settings,
      failOnOverflow,
      bufferSize
    )
  )

  def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
    new WatcherStage(
      IO(CoreSerial)(system),
      ports
    )
  )

}