aboutsummaryrefslogtreecommitdiff
path: root/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala')
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala100
1 files changed, 47 insertions, 53 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 6000778..1c24729 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -1,35 +1,69 @@
package com.github.jodersky.flow
import java.io.IOException
-
import com.github.jodersky.flow.internal.InternalSerial
-
-import Serial.Close
-import Serial.Closed
-import Serial.Opened
-import Serial.Received
-import Serial.NoAck
-import Serial.Write
+import Serial._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.actor.actorRef2Scala
import akka.util.ByteString
+import scala.collection.mutable.HashSet
/** Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. */
-class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging {
+class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging {
import SerialOperator._
import context._
- private object Reader extends Thread {
+ override def preStart() = {
+ Reader.start()
+ }
+
+ override def postStop = {
+ serial.close()
+ }
- def enterReadLoop() = {
+ def receive: Receive = {
+
+ case Register(actor) => receiversLock.synchronized{
+ receivers += actor
+ }
+
+ case Unregister(actor) => receiversLock.synchronized{
+ receivers -= actor
+ }
+
+ case Write(data, ack) => {
+ val sent = serial.write(data.toArray)
+ if (ack != NoAck) sender ! ack
+ }
+
+ case Close => {
+ sendAllReceivers(Closed)
+ context stop self
+ }
+
+ //go down with reader thread
+ case ReadException(ex) => throw ex
+
+ }
+
+ private val receivers = new HashSet[ActorRef]
+ private val receiversLock = new Object
+ private def sendAllReceivers(msg: Any) = receiversLock.synchronized {
+ receivers.foreach { receiver =>
+ receiver ! msg
+ }
+ }
+
+ private object Reader extends Thread {
+ def readLoop() = {
var continueReading = true
while (continueReading) {
try {
val data = ByteString(serial.read())
- handler ! Received(data)
+ sendAllReceivers(Received(data))
} catch {
//port is closing, stop thread gracefully
@@ -50,48 +84,8 @@ class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor wi
override def run() {
this.setName("flow-reader " + serial.port)
- enterReadLoop()
+ readLoop()
}
-
- }
-
- override def preStart() = {
- context watch handler
- handler ! Opened(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity))
- Reader.start()
- }
-
- override def postStop = {
- serial.close()
- }
-
- def receive: Receive = {
-
- case Write(data, ack) => {
- try {
- val sent = serial.write(data.toArray)
- if (ack != NoAck) sender ! ack
- } catch {
- case ex: IOException => {
- handler ! Closed(Some(ex))
- context stop self
- }
- }
- }
-
- case Close => {
- handler ! Closed(None)
- context stop self
- }
-
- case Terminated(`handler`) => context.stop(self)
-
- //go down with reader thread
- case ReadException(ex) => {
- handler ! Closed(Some(ex))
- context stop self
- }
-
}
}