aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-06-21 22:57:40 +0200
committerJakob Odersky <jodersky@gmail.com>2013-06-21 22:57:40 +0200
commitbe1e2330ab84cf449fae664fd0c9788e3ae4af1f (patch)
tree316c8e068d967b50b33b42138f98a9493fa0c6bf
parent5652600a7c6c9d8dbd715e077f3bca320d3e765b (diff)
downloadakka-serial-be1e2330ab84cf449fae664fd0c9788e3ae4af1f.tar.gz
akka-serial-be1e2330ab84cf449fae664fd0c9788e3ae4af1f.tar.bz2
akka-serial-be1e2330ab84cf449fae664fd0c9788e3ae4af1f.zip
improve thread safety in serial class
-rw-r--r--src/main/native/flow.c1
-rw-r--r--src/main/scala/com/github/jodersky/flow/SerialOperator.scala26
-rw-r--r--src/main/scala/com/github/jodersky/flow/low/Serial.scala10
3 files changed, 21 insertions, 16 deletions
diff --git a/src/main/native/flow.c b/src/main/native/flow.c
index 43dd750..c8927e9 100644
--- a/src/main/native/flow.c
+++ b/src/main/native/flow.c
@@ -163,6 +163,7 @@ int serial_close(struct serial_config* serial) {
}
free(serial);
+ return 0;
}
int serial_read(struct serial_config* serial, unsigned char* buffer, size_t size) {
diff --git a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index c9f38cd..951e9f9 100644
--- a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -13,34 +13,38 @@ import scala.concurrent._
class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor {
import context._
-
+
object Reader extends Thread {
- private var continueReading = true
-
+ private var continueReading = true
+
override def run() {
Thread.currentThread().setName("flow-reader " + serial.port)
while (continueReading) {
- println("beginning read")
- val data = ByteString(serial.read())
- println("return from read")
- handler ! Received(data)
+ try {
+ println("beginning read")
+ val data = ByteString(serial.read())
+ println("return from read")
+ handler ! Received(data)
+ } catch {
+ case ex: PortInterruptedException => continueReading = false
+ }
}
}
}
-
+
Reader.start()
-
+
context.watch(handler)
def receive = {
case c @ Write(data) => {
val writer = sender
- future{serial.write(data.toArray)}.onComplete {
+ future { serial.write(data.toArray) }.onComplete {
case Success(data) => writer ! Wrote(ByteString(data))
case Failure(t) => writer ! CommandFailed(c, t)
}
}
-
+
case Close => {
sender ! Closed
context.stop(self)
diff --git a/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/src/main/scala/com/github/jodersky/flow/low/Serial.scala
index 916ab84..0a098c5 100644
--- a/src/main/scala/com/github/jodersky/flow/low/Serial.scala
+++ b/src/main/scala/com/github/jodersky/flow/low/Serial.scala
@@ -21,8 +21,8 @@ class Serial private (val port: String, private val pointer: Long) {
def close(): Unit = if (!closed.get()) {
closed.set(true)
requireSuccess(NativeSerial.interrupt(pointer), port)
- if (writing.get()) wait() // if reading, wait for read to finish
- if (reading.get()) wait()
+ if (writing.get()) synchronized{wait()} // if reading, wait for read to finish
+ if (reading.get()) synchronized{wait()}
requireSuccess(NativeSerial.close(pointer), port)
}
@@ -30,7 +30,7 @@ class Serial private (val port: String, private val pointer: Long) {
reading.set(true)
val buffer = new Array[Byte](100)
val readResult = NativeSerial.read(pointer, buffer)
- if (closed.get) notify(); //read was interrupted by close
+ if (closed.get) synchronized{notify()}; //read was interrupted by close
reading.set(false)
val n = requireSuccess(readResult, port)
buffer take n
@@ -41,7 +41,7 @@ class Serial private (val port: String, private val pointer: Long) {
def write(data: Array[Byte]): Array[Byte] = if (!closed.get) {
writing.set(true)
val writeResult = NativeSerial.write(pointer, data)
- if (closed.get) notify()
+ if (closed.get) synchronized{notify()}
writing.set(false)
val n = requireSuccess(writeResult, port)
data take n
@@ -54,7 +54,7 @@ class Serial private (val port: String, private val pointer: Long) {
object Serial {
import NativeSerial._
- def requireSuccess(result: Int, port: String): Int = result match {
+ private def requireSuccess(result: Int, port: String): Int = result match {
case E_IO => throw new IOException(port)
case E_ACCESS_DENIED => throw new AccessDeniedException(port)
case E_BUSY => throw new PortInUseException(port)