summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-31 16:50:32 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-31 16:50:32 +0000
commit47fd02fe687c9ed19f82baf6322ef0b3ff8455c2 (patch)
tree005272d4584e04bb0991e38d57181ee89fba022c /src
parenta116937649f5a071f7030214f8aa1fafb430fee5 (diff)
downloadscala-47fd02fe687c9ed19f82baf6322ef0b3ff8455c2.tar.gz
scala-47fd02fe687c9ed19f82baf6322ef0b3ff8455c2.tar.bz2
scala-47fd02fe687c9ed19f82baf6322ef0b3ff8455c2.zip
scala.actors: fixed build on JDK 1.4
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Channel.scala54
-rw-r--r--src/actors/scala/actors/Scheduler.scala19
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala (renamed from src/actors/scala/actors/JDK5Scheduler.scala)15
3 files changed, 67 insertions, 21 deletions
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 700082f0dc..7449f9a4eb 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -43,7 +43,9 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
/**
- * Sends <code>msg</code> to this <code>Channel</code>.
+ * Sends a message to this <code>Channel</code>.
+ *
+ * @param msg the message to be sent
*/
def !(msg: Msg): unit = {
receiver ! scala.actors.!(this, msg)
@@ -57,34 +59,62 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver forward scala.actors.!(this, msg)
}
+ /**
+ * Receives a message from this <code>Channel</code>.
+ *
+ * @param f a partial function with message patterns and actions
+ * @return result of processing the received value
+ */
def receive[R](f: PartialFunction[Any, R]): R = {
val C = this.asInstanceOf[Channel[Any]]
- // Martin: had to do this to get it to compiler after bug909 fix
receiver.receive {
case C ! msg if (f.isDefinedAt(msg)) => f(msg)
}
}
+ /**
+ * Receives a message from this <code>Channel</code> within a certain
+ * time span.
+ *
+ * @param msec the time span before timeout
+ * @param f a partial function with message patterns and actions
+ * @return result of processing the received value
+ */
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
val C = this.asInstanceOf[Channel[Any]]
- // Martin: had to do this to get it to compiler after bug909 fix
receiver.receiveWithin(msec) {
case C ! msg if (f.isDefinedAt(msg)) => f(msg)
case TIMEOUT => f(TIMEOUT)
}
}
+ /**
+ * Receives a message from this <code>Channel</code>.
+ * <p>
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param f a partial function with message patterns and actions
+ */
def react(f: PartialFunction[Any, Unit]): Nothing = {
val C = this.asInstanceOf[Channel[Any]]
- // Martin: had to do this to get it to compiler after bug909 fix
receiver.react {
case C ! msg if (f.isDefinedAt(msg)) => f(msg)
}
}
+ /**
+ * Receives a message from this <code>Channel</code> within a certain
+ * time span.
+ * <p>
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param msec the time span before timeout
+ * @param f a partial function with message patterns and actions
+ */
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
val C = this.asInstanceOf[Channel[Any]]
- // Martin: had to do this to get it to compiler after bug909 fix
receiver.reactWithin(msec) {
case C ! msg if (f.isDefinedAt(msg)) => f(msg)
case TIMEOUT => f(TIMEOUT)
@@ -92,8 +122,11 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
/**
- * Sends <code>msg</code> to this <code>Channel</code> and
+ * Sends a message to this <code>Channel</code> and
* awaits reply.
+ *
+ * @param msg the message to be sent
+ * @return the reply
*/
def !?(msg: Msg): Any = {
val replyChannel = Actor.self.freshReply()
@@ -103,6 +136,15 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
}
+ /**
+ * Sends a message to this <code>Channel</code> and
+ * awaits reply within a certain time span.
+ *
+ * @param msec the time span before timeout
+ * @param msg the message to be sent
+ * @return <code>None</code> in case of timeout, otherwise
+ * <code>Some(x)</code> where <code>x</code> is the reply
+ */
def !?(msec: long, msg: Msg): Option[Any] = {
val replyChannel = Actor.self.freshReply()
receiver ! scala.actors.!(this, msg)
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 30bb601645..15852fa7cb 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
object Scheduler {
private var sched: IScheduler =
{
- var s: Thread with IScheduler = null
+ var s: IScheduler = null
// Check for JDK version >= 1.5
var olderThanJDK5 = false
@@ -42,18 +42,8 @@ object Scheduler {
s = if (olderThanJDK5)
new TickedScheduler
- else {
- var corePoolSize = 4
- var maxPoolSize = 16
- val prop = java.lang.System.getProperty("actors.corePoolSize")
- if (null ne prop) {
- corePoolSize =
- Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
- maxPoolSize =
- Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
- }
- new JDK5Scheduler(corePoolSize, maxPoolSize)
- }
+ else
+ Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler]
s.start()
s
}
@@ -86,6 +76,7 @@ object Scheduler {
* @author Philipp Haller
*/
trait IScheduler {
+ def start(): unit
def start(task: Reaction): unit
def execute(task: Reaction): unit
def getTask(worker: WorkerThread): Runnable
@@ -115,6 +106,8 @@ trait IScheduler {
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
+ def start() {}
+
def start(task: Reaction) {
// execute task immediately on same thread
task.run()
diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
index 8f5d9a611e..2174d90497 100644
--- a/src/actors/scala/actors/JDK5Scheduler.scala
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -13,7 +13,7 @@ import java.util.concurrent.{ThreadPoolExecutor,
TimeUnit,
RejectedExecutionHandler}
-class TaskRejectedHandler(sched: JDK5Scheduler) extends RejectedExecutionHandler {
+class TaskRejectedHandler(sched: ThreadPoolScheduler) extends RejectedExecutionHandler {
def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
sched.pendReaction
r.run()
@@ -26,7 +26,18 @@ class TaskRejectedHandler(sched: JDK5Scheduler) extends RejectedExecutionHandler
* @version 0.9.2
* @author Philipp Haller
*/
-class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with IScheduler {
+class ThreadPoolScheduler extends Thread with IScheduler {
+
+ var initCoreSize = 4
+ var maxSize = 16
+
+ val prop = java.lang.System.getProperty("actors.corePoolSize")
+ if (null ne prop) {
+ initCoreSize =
+ Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
+ maxSize =
+ Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
+ }
/* Note:
* When using an unbounded queue such as a