From 1fc969d39de5a2f7da76b8474fe848eafa2fc0fb Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sat, 15 Mar 2014 23:35:51 +0100 Subject: update files --- .../scala/com/github/jodersky/flow/Serial.scala | 41 +++++++--------------- .../com/github/jodersky/flow/SerialManager.scala | 27 ++++---------- .../com/github/jodersky/flow/SerialOperator.scala | 36 ++++++------------- 3 files changed, 30 insertions(+), 74 deletions(-) diff --git a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala index abc8f39..e5cf1d6 100644 --- a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -7,11 +7,13 @@ import akka.util.ByteString /** Defines messages used by flow's serial IO layer. */ object Serial extends ExtensionKey[SerialExt] { + sealed trait Message + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ - trait Command + trait Command extends Message /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ - trait Event + trait Event extends Message /** A command has failed. */ case class CommandFailed(command: Command, reason: Throwable) extends Event @@ -26,40 +28,23 @@ object Serial extends ExtensionKey[SerialExt] { * In case the port is successfully opened, the operator will respond with an `Opened` message. * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. * - * @param settings settings of serial port to open + * @param port name of serial port + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port */ - case class Open(settings: SerialSettings) extends Command + case class Open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Command /** * A port has been successfully opened. * * Event sent by a port operator, indicating that a serial port was successfully opened. The sender - * of this message is the operator associated to the given serial port. Furthermore, an additional reference - * to the operator is provided in this class' `operator` field. + * of this message is the operator associated to the given serial port. * - * @param settings settings of port that was opened - * @param operator operator associated with the serial port + * @param port name of opened serial port */ - case class Opened(settings: SerialSettings, operator: ActorRef) extends Event - - /** - * Register an actor to receive events. - * - * Send this command to a serial operator to register an actor for notification on the reception of data on the operator's associated port. - * Upon reception, data will be sent by the operator to registered actors in form of `Received` events. - * - * @param receiver actor to register - */ - case class Register(receiver: ActorRef) extends Command - - /** - * Unregister an actor from receiving events. - * - * Send this command to a serial operator to unregister an actor for notification on the reception of data on the operator's associated port. - * - * @param receiver actor to unregister - */ - case class Unregister(receiver: ActorRef) extends Command + case class Opened(port: String) extends Event /** * Data has been received. diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala index b3128ac..e884225 100644 --- a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -1,20 +1,15 @@ package com.github.jodersky.flow -import java.io.IOException - import scala.util.Failure import scala.util.Success import scala.util.Try import com.github.jodersky.flow.internal.InternalSerial -import Serial._ +import Serial.CommandFailed +import Serial.Open import akka.actor.Actor import akka.actor.ActorLogging -import akka.actor.OneForOneStrategy -import akka.actor.Props -import akka.actor.SupervisorStrategy.Escalate -import akka.actor.SupervisorStrategy.Stop import akka.actor.actorRef2Scala /** @@ -26,20 +21,12 @@ class SerialManager extends Actor with ActorLogging { import SerialManager._ import context._ - override val supervisorStrategy = - OneForOneStrategy() { - case _: IOException => Stop - case _: Exception => Escalate - } - def receive = { - case c @ Open(s) => Try { InternalSerial.open(s.port, s.baud, s.characterSize, s.twoStopBits, s.parity.id) } match { - case Failure(t) => sender ! CommandFailed(c, t) - case Success(serial) => { - val operator = context.actorOf(SerialOperator(serial), name = escapePortString(s.port)) - val settings = SerialSettings(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity)) - sender.tell(Opened(settings, operator), operator) - } + case open @ Open(port, baud, characterSize, twoStopBits, parity) => Try { + InternalSerial.open(port, baud, characterSize, twoStopBits, parity.id) + } match { + case Success(internal) => context.actorOf(SerialOperator(internal, sender), name = escapePortString(internal.port)) + case Failure(err) => sender ! CommandFailed(open, err) } } diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 3ac50c0..69db4df 100644 --- a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -16,25 +16,17 @@ import akka.actor.Props * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. * @see SerialManager */ -class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { +class SerialOperator(serial: InternalSerial, client: ActorRef) extends Actor with ActorLogging { import SerialOperator._ import context._ - private val receivers = new HashSet[ActorRef] - private val receiversLock = new Object - private def tellAllReceivers(msg: Any) = receiversLock.synchronized { - receivers.foreach { receiver => - receiver ! msg - } - } - private object Reader extends Thread { def readLoop() = { var continueReading = true while (continueReading) { try { val data = ByteString(serial.read()) - tellAllReceivers(Received(data)) + client ! Received(data) } catch { //port is closing, stop thread gracefully @@ -51,17 +43,17 @@ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { } } - def name = this.getName() - override def run() { this.setName("flow-reader " + serial.port) readLoop() } } - - override def preStart() = { - Reader.start() - } + + + client ! Opened(serial.port) + context.watch(client) + Reader.start() + override def postStop = { serial.close() @@ -69,21 +61,13 @@ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { def receive: Receive = { - case Register(actor) => receiversLock.synchronized { - receivers += actor - } - - case Unregister(actor) => receiversLock.synchronized { - receivers -= actor - } - case Write(data, ack) => { val sent = serial.write(data.toArray) if (ack != NoAck) sender ! ack } case Close => { - tellAllReceivers(Closed) + client ! Closed context stop self } @@ -97,5 +81,5 @@ class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { object SerialOperator { private case class ReadException(ex: Exception) - def apply(serial: InternalSerial) = Props(classOf[SerialOperator], serial) + def apply(serial: InternalSerial, client: ActorRef) = Props(classOf[SerialOperator], serial, client) } \ No newline at end of file -- cgit v1.2.3