diff options
author | Jakob Odersky <jodersky@gmail.com> | 2013-07-23 22:00:18 +0200 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2013-07-23 22:01:12 +0200 |
commit | 0e90e8868ae11284e1fe8926e2a7490195267cc7 (patch) | |
tree | 5852f126736a08af43a1041aa0fea1b0a50fa3c8 /flow-main | |
parent | 5f61cd57119108dc15a5cef112a36649017204b0 (diff) | |
download | akka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.tar.gz akka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.tar.bz2 akka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.zip |
make serial messaging more akka-io like
Diffstat (limited to 'flow-main')
3 files changed, 63 insertions, 72 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala index 46842f9..f8305c2 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -12,6 +12,8 @@ object Serial extends ExtensionKey[SerialExt] { /** A message extending this trait is to be viewed as an event, that is an in-bound message. */ trait Event + + case class CommandFailed(command: Command, reason: Throwable) extends Event /** * Open a new serial port. Send this command to the serial manager to request the opening of a serial port. The manager will @@ -36,17 +38,10 @@ object Serial extends ExtensionKey[SerialExt] { * @param parity type of parity to use with serial port */ case class Opened(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Event - - /** - * Event sent from the serial manager, indicating that a serial port could not be opened. - * @param reason throwable containing reason to why the requested port could not be opened - * @param port name of serial port - * @param baud baud rate to use with serial port - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - */ - case class OpenFailed(reason: Throwable, port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Event + + + case class Register(receiver: ActorRef) extends Command + case class Unregister(receiver: ActorRef) extends Command /** * Event sent by operator, indicating that data was received from the operator's serial port. @@ -68,9 +63,8 @@ object Serial extends ExtensionKey[SerialExt] { case object Close extends Command /** - * Event sent from operator, indicating that its port has been closed. An optional reason explains the error (if any) that caused the closing of the port. - * @param reason Some(exception) explaining the exception that caused the closing, None if the port was closed by sending a `Close` message. + * Event sent from operator, indicating that its port has been closed. */ - case class Closed(reason: Option[Exception]) extends Event + case object Closed extends Event } diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala index e47de31..ce75cc0 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -8,8 +8,7 @@ import scala.util.Try import com.github.jodersky.flow.internal.InternalSerial -import Serial.Open -import Serial.OpenFailed +import Serial._ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.OneForOneStrategy @@ -33,9 +32,13 @@ class SerialManager extends Actor with ActorLogging { } def receive = { - case Open(port, baud, cs, tsb, parity) => Try { InternalSerial.open(port, baud, cs, tsb, parity.id) } match { - case Failure(t) => sender ! OpenFailed(t, port, baud, cs, tsb, parity) - case Success(serial) => context.actorOf(Props(classOf[SerialOperator], sender, serial), name = escapePortString(port)) + case c @ Open(port, baud, cs, tsb, parity) => Try { InternalSerial.open(port, baud, cs, tsb, parity.id) } match { + case Failure(t) => sender ! CommandFailed(c, t) + case Success(serial) => { + val operator = context.actorOf(Props(classOf[SerialOperator], serial), name = escapePortString(port)) + val opened = Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity)) + sender.tell(opened, operator) + } } } diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 6000778..1c24729 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -1,35 +1,69 @@ package com.github.jodersky.flow import java.io.IOException - import com.github.jodersky.flow.internal.InternalSerial - -import Serial.Close -import Serial.Closed -import Serial.Opened -import Serial.Received -import Serial.NoAck -import Serial.Write +import Serial._ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Terminated import akka.actor.actorRef2Scala import akka.util.ByteString +import scala.collection.mutable.HashSet /** Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. */ -class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging { +class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { import SerialOperator._ import context._ - private object Reader extends Thread { + override def preStart() = { + Reader.start() + } + + override def postStop = { + serial.close() + } - def enterReadLoop() = { + def receive: Receive = { + + case Register(actor) => receiversLock.synchronized{ + receivers += actor + } + + case Unregister(actor) => receiversLock.synchronized{ + receivers -= actor + } + + case Write(data, ack) => { + val sent = serial.write(data.toArray) + if (ack != NoAck) sender ! ack + } + + case Close => { + sendAllReceivers(Closed) + context stop self + } + + //go down with reader thread + case ReadException(ex) => throw ex + + } + + private val receivers = new HashSet[ActorRef] + private val receiversLock = new Object + private def sendAllReceivers(msg: Any) = receiversLock.synchronized { + receivers.foreach { receiver => + receiver ! msg + } + } + + private object Reader extends Thread { + def readLoop() = { var continueReading = true while (continueReading) { try { val data = ByteString(serial.read()) - handler ! Received(data) + sendAllReceivers(Received(data)) } catch { //port is closing, stop thread gracefully @@ -50,48 +84,8 @@ class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor wi override def run() { this.setName("flow-reader " + serial.port) - enterReadLoop() + readLoop() } - - } - - override def preStart() = { - context watch handler - handler ! Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity)) - Reader.start() - } - - override def postStop = { - serial.close() - } - - def receive: Receive = { - - case Write(data, ack) => { - try { - val sent = serial.write(data.toArray) - if (ack != NoAck) sender ! ack - } catch { - case ex: IOException => { - handler ! Closed(Some(ex)) - context stop self - } - } - } - - case Close => { - handler ! Closed(None) - context stop self - } - - case Terminated(`handler`) => context.stop(self) - - //go down with reader thread - case ReadException(ex) => { - handler ! Closed(Some(ex)) - context stop self - } - } } |