diff options
Diffstat (limited to 'flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala')
-rw-r--r-- | flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 1c24729..3ac50c0 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -10,48 +10,19 @@ import akka.actor.Terminated import akka.actor.actorRef2Scala import akka.util.ByteString import scala.collection.mutable.HashSet +import akka.actor.Props -/** Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. */ +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { import SerialOperator._ import context._ - override def preStart() = { - Reader.start() - } - - override def postStop = { - serial.close() - } - - def receive: Receive = { - - case Register(actor) => receiversLock.synchronized{ - receivers += actor - } - - case Unregister(actor) => receiversLock.synchronized{ - receivers -= actor - } - - case Write(data, ack) => { - val sent = serial.write(data.toArray) - if (ack != NoAck) sender ! ack - } - - case Close => { - sendAllReceivers(Closed) - context stop self - } - - //go down with reader thread - case ReadException(ex) => throw ex - - } - private val receivers = new HashSet[ActorRef] private val receiversLock = new Object - private def sendAllReceivers(msg: Any) = receiversLock.synchronized { + private def tellAllReceivers(msg: Any) = receiversLock.synchronized { receivers.foreach { receiver => receiver ! msg } @@ -63,7 +34,7 @@ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { while (continueReading) { try { val data = ByteString(serial.read()) - sendAllReceivers(Received(data)) + tellAllReceivers(Received(data)) } catch { //port is closing, stop thread gracefully @@ -88,8 +59,43 @@ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { } } + override def preStart() = { + Reader.start() + } + + override def postStop = { + serial.close() + } + + def receive: Receive = { + + case Register(actor) => receiversLock.synchronized { + receivers += actor + } + + case Unregister(actor) => receiversLock.synchronized { + receivers -= actor + } + + case Write(data, ack) => { + val sent = serial.write(data.toArray) + if (ack != NoAck) sender ! ack + } + + case Close => { + tellAllReceivers(Closed) + context stop self + } + + //go down with reader thread + case ReadException(ex) => throw ex + + } + } object SerialOperator { private case class ReadException(ex: Exception) + + def apply(serial: InternalSerial) = Props(classOf[SerialOperator], serial) }
\ No newline at end of file |