aboutsummaryrefslogtreecommitdiff
path: root/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala
blob: d5c131c157003a02e872a86c00acceabedbd095b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package ch.jodersky.flow

import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.util.ByteString
import java.nio.ByteBuffer

/**
  * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
  * @see SerialManager
  */
private[flow] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor {
  import SerialOperator._
  import context._

  case class ReaderDied(ex: Throwable)
  object Reader extends Thread {
    val buffer = ByteBuffer.allocateDirect(bufferSize)

    def loop() = {
      var stop = false
      while (!connection.isClosed && !stop) {
        try {
          buffer.clear()
          val length = connection.read(buffer)
          buffer.limit(length)
          val data = ByteString.fromByteBuffer(buffer)
          client.tell(Serial.Received(data), self)
        } catch {
          // don't do anything if port is interrupted
          case ex: PortInterruptedException => {}

          //stop and tell operator on other exception
          case ex: Exception =>
            stop = true
            self.tell(ReaderDied(ex), Actor.noSender)
        }
      }
    }

    override def run() {
      this.setName(s"serial-reader(${connection.port})")
      loop()
    }

  }

  val writeBuffer = ByteBuffer.allocateDirect(bufferSize)

  override def preStart() = {
    context watch client
    client ! Serial.Opened(connection.port)
    Reader.start()
  }

  override def receive: Receive = {

    case Serial.Write(data, ack) =>
      writeBuffer.clear()
      data.copyToBuffer(writeBuffer)
      val sent = connection.write(writeBuffer)
      if (ack != Serial.NoAck) sender ! ack(sent)

    case Serial.Close =>
      client ! Serial.Closed
      context stop self

    case Terminated(`client`) =>
      context stop self

    // go down with reader thread
    case ReaderDied(ex) => throw ex

  }

  override def postStop() = {
    connection.close()
  }

}

private[flow] object SerialOperator {
  def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
}