aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-07-23 22:00:18 +0200
committerJakob Odersky <jodersky@gmail.com>2013-07-23 22:01:12 +0200
commit0e90e8868ae11284e1fe8926e2a7490195267cc7 (patch)
tree5852f126736a08af43a1041aa0fea1b0a50fa3c8
parent5f61cd57119108dc15a5cef112a36649017204b0 (diff)
downloadakka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.tar.gz
akka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.tar.bz2
akka-serial-0e90e8868ae11284e1fe8926e2a7490195267cc7.zip
make serial messaging more akka-io like
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala22
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala13
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala100
-rw-r--r--flow-samples/terminal/src/main/scala/com/github/jodersky/flow/samples/terminal/Terminal.scala30
4 files changed, 85 insertions, 80 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
index 46842f9..f8305c2 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -12,6 +12,8 @@ object Serial extends ExtensionKey[SerialExt] {
/** A message extending this trait is to be viewed as an event, that is an in-bound message. */
trait Event
+
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
/**
* Open a new serial port. Send this command to the serial manager to request the opening of a serial port. The manager will
@@ -36,17 +38,10 @@ object Serial extends ExtensionKey[SerialExt] {
* @param parity type of parity to use with serial port
*/
case class Opened(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Event
-
- /**
- * Event sent from the serial manager, indicating that a serial port could not be opened.
- * @param reason throwable containing reason to why the requested port could not be opened
- * @param port name of serial port
- * @param baud baud rate to use with serial port
- * @param characterSize size of a character of the data sent through the serial port
- * @param twoStopBits set to use two stop bits instead of one
- * @param parity type of parity to use with serial port
- */
- case class OpenFailed(reason: Throwable, port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Event
+
+
+ case class Register(receiver: ActorRef) extends Command
+ case class Unregister(receiver: ActorRef) extends Command
/**
* Event sent by operator, indicating that data was received from the operator's serial port.
@@ -68,9 +63,8 @@ object Serial extends ExtensionKey[SerialExt] {
case object Close extends Command
/**
- * Event sent from operator, indicating that its port has been closed. An optional reason explains the error (if any) that caused the closing of the port.
- * @param reason Some(exception) explaining the exception that caused the closing, None if the port was closed by sending a `Close` message.
+ * Event sent from operator, indicating that its port has been closed.
*/
- case class Closed(reason: Option[Exception]) extends Event
+ case object Closed extends Event
}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
index e47de31..ce75cc0 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -8,8 +8,7 @@ import scala.util.Try
import com.github.jodersky.flow.internal.InternalSerial
-import Serial.Open
-import Serial.OpenFailed
+import Serial._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
@@ -33,9 +32,13 @@ class SerialManager extends Actor with ActorLogging {
}
def receive = {
- case Open(port, baud, cs, tsb, parity) => Try { InternalSerial.open(port, baud, cs, tsb, parity.id) } match {
- case Failure(t) => sender ! OpenFailed(t, port, baud, cs, tsb, parity)
- case Success(serial) => context.actorOf(Props(classOf[SerialOperator], sender, serial), name = escapePortString(port))
+ case c @ Open(port, baud, cs, tsb, parity) => Try { InternalSerial.open(port, baud, cs, tsb, parity.id) } match {
+ case Failure(t) => sender ! CommandFailed(c, t)
+ case Success(serial) => {
+ val operator = context.actorOf(Props(classOf[SerialOperator], serial), name = escapePortString(port))
+ val opened = Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity))
+ sender.tell(opened, operator)
+ }
}
}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 6000778..1c24729 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -1,35 +1,69 @@
package com.github.jodersky.flow
import java.io.IOException
-
import com.github.jodersky.flow.internal.InternalSerial
-
-import Serial.Close
-import Serial.Closed
-import Serial.Opened
-import Serial.Received
-import Serial.NoAck
-import Serial.Write
+import Serial._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.actor.actorRef2Scala
import akka.util.ByteString
+import scala.collection.mutable.HashSet
/** Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. */
-class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging {
+class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging {
import SerialOperator._
import context._
- private object Reader extends Thread {
+ override def preStart() = {
+ Reader.start()
+ }
+
+ override def postStop = {
+ serial.close()
+ }
- def enterReadLoop() = {
+ def receive: Receive = {
+
+ case Register(actor) => receiversLock.synchronized{
+ receivers += actor
+ }
+
+ case Unregister(actor) => receiversLock.synchronized{
+ receivers -= actor
+ }
+
+ case Write(data, ack) => {
+ val sent = serial.write(data.toArray)
+ if (ack != NoAck) sender ! ack
+ }
+
+ case Close => {
+ sendAllReceivers(Closed)
+ context stop self
+ }
+
+ //go down with reader thread
+ case ReadException(ex) => throw ex
+
+ }
+
+ private val receivers = new HashSet[ActorRef]
+ private val receiversLock = new Object
+ private def sendAllReceivers(msg: Any) = receiversLock.synchronized {
+ receivers.foreach { receiver =>
+ receiver ! msg
+ }
+ }
+
+ private object Reader extends Thread {
+ def readLoop() = {
var continueReading = true
while (continueReading) {
try {
val data = ByteString(serial.read())
- handler ! Received(data)
+ sendAllReceivers(Received(data))
} catch {
//port is closing, stop thread gracefully
@@ -50,48 +84,8 @@ class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor wi
override def run() {
this.setName("flow-reader " + serial.port)
- enterReadLoop()
+ readLoop()
}
-
- }
-
- override def preStart() = {
- context watch handler
- handler ! Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity))
- Reader.start()
- }
-
- override def postStop = {
- serial.close()
- }
-
- def receive: Receive = {
-
- case Write(data, ack) => {
- try {
- val sent = serial.write(data.toArray)
- if (ack != NoAck) sender ! ack
- } catch {
- case ex: IOException => {
- handler ! Closed(Some(ex))
- context stop self
- }
- }
- }
-
- case Close => {
- handler ! Closed(None)
- context stop self
- }
-
- case Terminated(`handler`) => context.stop(self)
-
- //go down with reader thread
- case ReadException(ex) => {
- handler ! Closed(Some(ex))
- context stop self
- }
-
}
}
diff --git a/flow-samples/terminal/src/main/scala/com/github/jodersky/flow/samples/terminal/Terminal.scala b/flow-samples/terminal/src/main/scala/com/github/jodersky/flow/samples/terminal/Terminal.scala
index ff22910..2997e77 100644
--- a/flow-samples/terminal/src/main/scala/com/github/jodersky/flow/samples/terminal/Terminal.scala
+++ b/flow-samples/terminal/src/main/scala/com/github/jodersky/flow/samples/terminal/Terminal.scala
@@ -12,6 +12,7 @@ import com.github.jodersky.flow.Parity
import akka.actor.Props
class Terminal(port: String, baud: Int, cs: Int, tsb: Boolean, parity: Parity.Parity) extends Actor with ActorLogging {
+ import Terminal._
import context._
val reader = actorOf(Props[ConsoleReader])
@@ -26,28 +27,36 @@ class Terminal(port: String, baud: Int, cs: Int, tsb: Boolean, parity: Parity.Pa
}
def receive = {
- case OpenFailed(reason, _, _, _, _, _) => {
+ case CommandFailed(cmd, reason) => {
log.error(s"Connection failed, stopping terminal. Reason: ${reason}")
context stop self
}
case Opened(port, _, _, _, _) => {
log.info(s"Port ${port} is now open.")
- context become opened(sender)
+ val operator = sender
+ context become opened(operator)
+ context watch operator
+ operator ! Register(self)
reader ! Read
}
}
def opened(operator: ActorRef): Receive = {
+
case Received(data) => {
log.info(s"Received data: ${formatData(data)} (${new String(data.toArray, "UTF-8")})")
}
+
case Wrote(data) => log.info(s"Wrote data: ${formatData(data)} (${new String(data.toArray, "UTF-8")})")
- case Closed(x) => {
- x match {
- case None => log.info("Operator closed normally, exiting terminal.")
- case Some(ex) => log.error("Operator crashed, exiting terminal.")
- }
+ case Closed => {
+ log.info("Operator closed normally, exiting terminal.")
+ context unwatch operator
+ context stop self
+ }
+
+ case Terminated(`operator`) => {
+ log.error("Operator crashed, exiting terminal.")
context stop self
}
@@ -57,11 +66,16 @@ class Terminal(port: String, baud: Int, cs: Int, tsb: Boolean, parity: Parity.Pa
}
case ConsoleInput(input) => {
- operator ! Write(ByteString(input.getBytes), true)
+ val data = ByteString(input.getBytes)
+ operator ! Write(data, Wrote(data))
reader ! Read
}
}
private def formatData(data: ByteString) = data.mkString("[", ",", "]")
+}
+
+object Terminal {
+ case class Wrote(data: ByteString) extends Event
} \ No newline at end of file