diff options
Diffstat (limited to 'flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala')
-rw-r--r-- | flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala | 100 |
1 files changed, 47 insertions, 53 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 6000778..1c24729 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -1,35 +1,69 @@ package com.github.jodersky.flow import java.io.IOException - import com.github.jodersky.flow.internal.InternalSerial - -import Serial.Close -import Serial.Closed -import Serial.Opened -import Serial.Received -import Serial.NoAck -import Serial.Write +import Serial._ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Terminated import akka.actor.actorRef2Scala import akka.util.ByteString +import scala.collection.mutable.HashSet /** Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. */ -class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging { +class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { import SerialOperator._ import context._ - private object Reader extends Thread { + override def preStart() = { + Reader.start() + } + + override def postStop = { + serial.close() + } - def enterReadLoop() = { + def receive: Receive = { + + case Register(actor) => receiversLock.synchronized{ + receivers += actor + } + + case Unregister(actor) => receiversLock.synchronized{ + receivers -= actor + } + + case Write(data, ack) => { + val sent = serial.write(data.toArray) + if (ack != NoAck) sender ! ack + } + + case Close => { + sendAllReceivers(Closed) + context stop self + } + + //go down with reader thread + case ReadException(ex) => throw ex + + } + + private val receivers = new HashSet[ActorRef] + private val receiversLock = new Object + private def sendAllReceivers(msg: Any) = receiversLock.synchronized { + receivers.foreach { receiver => + receiver ! msg + } + } + + private object Reader extends Thread { + def readLoop() = { var continueReading = true while (continueReading) { try { val data = ByteString(serial.read()) - handler ! Received(data) + sendAllReceivers(Received(data)) } catch { //port is closing, stop thread gracefully @@ -50,48 +84,8 @@ class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor wi override def run() { this.setName("flow-reader " + serial.port) - enterReadLoop() + readLoop() } - - } - - override def preStart() = { - context watch handler - handler ! Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity)) - Reader.start() - } - - override def postStop = { - serial.close() - } - - def receive: Receive = { - - case Write(data, ack) => { - try { - val sent = serial.write(data.toArray) - if (ack != NoAck) sender ! ack - } catch { - case ex: IOException => { - handler ! Closed(Some(ex)) - context stop self - } - } - } - - case Close => { - handler ! Closed(None) - context stop self - } - - case Terminated(`handler`) => context.stop(self) - - //go down with reader thread - case ReadException(ex) => { - handler ! Closed(Some(ex)) - context stop self - } - } } |