summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichelou <michelou@epfl.ch>2006-06-25 17:41:39 +0000
committermichelou <michelou@epfl.ch>2006-06-25 17:41:39 +0000
commitef34b6a65b2f33abcd8295726ba026ab97d488eb (patch)
treea46f0b5e5c1809bfdcdbde8ae69620d2810bfd8a
parentb9e7cf28ee0fe24c71e4b2d647d842d81e1c4f41 (diff)
downloadscala-ef34b6a65b2f33abcd8295726ba026ab97d488eb.tar.gz
scala-ef34b6a65b2f33abcd8295726ba026ab97d488eb.tar.bz2
scala-ef34b6a65b2f33abcd8295726ba026ab97d488eb.zip
cleaned up import statements in src/actors/ files
-rw-r--r--src/actors/scala/actors/distributed/JavaSerializer.scala6
-rw-r--r--src/actors/scala/actors/distributed/NetKernel.scala63
-rw-r--r--src/actors/scala/actors/distributed/RemoteActor.scala46
-rw-r--r--src/actors/scala/actors/distributed/TcpSerializerComb.scala21
-rw-r--r--src/actors/scala/actors/distributed/TcpService.scala43
-rw-r--r--src/actors/scala/actors/distributed/TcpServiceWorker.scala8
-rw-r--r--src/actors/scala/actors/distributed/Util.scala3
7 files changed, 88 insertions, 102 deletions
diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala
index e9eb4b2dee..a4774503cf 100644
--- a/src/actors/scala/actors/distributed/JavaSerializer.scala
+++ b/src/actors/scala/actors/distributed/JavaSerializer.scala
@@ -11,11 +11,9 @@
package scala.actors.distributed
import java.io._
-import scala.collection.mutable._
import scala.actors.distributed.picklers.BytePickle.SPU
-import scala.actors.distributed.picklers._
-import scala.actors.multi._
+import scala.actors.multi.Pid
[serializable]
class JavaSerializer(serv: Service) extends Serializer(serv) {
@@ -32,7 +30,7 @@ class JavaSerializer(serv: Service) extends Serializer(serv) {
bos.toByteArray()
}
- def deserialize(bytes: Array[byte]): AnyRef = {
+ def deserialize(bytes: Array[Byte]): AnyRef = {
val bis = new ByteArrayInputStream(bytes)
val in = new ObjectInputStream(bis)
in.readObject()
diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala
index 9f433d03e7..8d479df377 100644
--- a/src/actors/scala/actors/distributed/NetKernel.scala
+++ b/src/actors/scala/actors/distributed/NetKernel.scala
@@ -10,15 +10,12 @@
package scala.actors.distributed
-import java.io.{StringReader,StringWriter}
-import java.util.logging._
-
-import scala.collection.mutable.{HashMap,HashSet}
-
-import java.net.UnknownHostException
-import java.io.IOException
+import java.io.{IOException,StringReader,StringWriter}
import java.lang.SecurityException
+import java.net.UnknownHostException
+import java.util.logging.{ConsoleHandler,Level,Logger}
+import scala.collection.mutable.{HashMap,HashSet}
import scala.actors.multi.{Actor,ExcHandlerDesc}
case class RA(a: RemoteActor)
@@ -35,15 +32,15 @@ class NetKernel(service: Service) {
// contains constructors
private val ptable =
- new HashMap[String, () => RemoteActor];
+ new HashMap[String, () => RemoteActor]
// maps local ids to scala.actors
private val rtable =
- new HashMap[int, RemoteActor];
+ new HashMap[int, RemoteActor]
// maps scala.actors to their RemotePid
private val pidTable =
- new HashMap[RemoteActor, RemotePid];
+ new HashMap[RemoteActor, RemotePid]
private var running = true;
@@ -57,7 +54,7 @@ class NetKernel(service: Service) {
//start // start NetKernel
/** only called if destDesc is local. */
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
destDesc.pid match {
case rpid: RemotePid =>
(rtable get rpid.localId) match {
@@ -67,9 +64,8 @@ class NetKernel(service: Service) {
error("exc desc refers to non-registered actor")
}
}
- }
- def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) =
// locality check (handler local to this node?)
destDesc.pid match {
case rpid: RemotePid =>
@@ -78,7 +74,6 @@ class NetKernel(service: Service) {
else
sendToNode(rpid.node, ForwardExc(destDesc, e))
}
- }
def sendToNode(node: Node, msg: AnyRef) = {
//val sw = new StringWriter
@@ -105,7 +100,7 @@ class NetKernel(service: Service) {
service.disconnectNode(n)
}
- def getLocalRef(locId: int): RemoteActor =
+ def getLocalRef(locId: Int): RemoteActor =
rtable.get(locId) match {
case None =>
error("" + locId + " is not registered at " + this)
@@ -113,7 +108,7 @@ class NetKernel(service: Service) {
remoteActor
}
- def localSend(localId: int, msg: AnyRef): unit = synchronized {
+ def localSend(localId: Int, msg: AnyRef): Unit = synchronized {
rtable.get(localId) match {
case None =>
error("" + localId + " is not registered at " + this)
@@ -123,15 +118,15 @@ class NetKernel(service: Service) {
}
}
- def localSend(pid: RemotePid, msg: AnyRef): unit =
- localSend(pid.localId, msg);
+ def localSend(pid: RemotePid, msg: AnyRef): Unit =
+ localSend(pid.localId, msg)
def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized {
//Console.println("NetKernel: Remote msg delivery to " + pid)
service.remoteSend(pid, msg)
}
- def namedSend(name: Name, msg: AnyRef): unit = {
+ def namedSend(name: Name, msg: AnyRef): Unit =
if (name.node == this.node) {
// look-up name
nameTable.get(name.sym) match {
@@ -151,15 +146,14 @@ class NetKernel(service: Service) {
sendToNode(name.node, NamedSend(name.sym, bytes))
//sendToNode(name.node, NamedSend(name.sym, sw.toString()))
}
- }
- val nameTable = new HashMap[Symbol, int]
+ val nameTable = new HashMap[Symbol, Int]
- def registerName(name: Symbol, pid: RemotePid): unit = synchronized {
+ def registerName(name: Symbol, pid: RemotePid): Unit = synchronized {
nameTable += name -> pid.localId
}
- def registerName(name: Symbol, a: RemoteActor): unit = synchronized {
+ def registerName(name: Symbol, a: RemoteActor): Unit = synchronized {
val pid = register(a)
registerName(name, pid)
a.start
@@ -247,7 +241,7 @@ class NetKernel(service: Service) {
service.send(remoteNode, sw.toString())
}*/
- def processMsg(msg: AnyRef): unit = synchronized {
+ def processMsg(msg: AnyRef): Unit = synchronized {
msg match {
case Spawn(reply: RemotePid, pname) =>
val newPid = spawn(pname)
@@ -358,7 +352,7 @@ class NetKernel(service: Service) {
}
// assume this.node != node
- def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): unit = {
+ def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): Unit = {
val ra = RA(a)
//val rsw = new StringWriter
//service.serializer.serialize(ra, rsw)
@@ -389,7 +383,7 @@ class NetKernel(service: Service) {
/* Spawns a new actor (locally), executing "fun".
*/
- def spawn(fun: RemoteActor => unit): RemotePid = synchronized {
+ def spawn(fun: RemoteActor => Unit): RemotePid = synchronized {
val newProc = new RemoteActor {
override def run: unit =
fun(this);
@@ -427,7 +421,7 @@ class NetKernel(service: Service) {
// which of the local processes traps exit signals?
private val trapExits = new HashSet[int];
- def processFlag(pid: RemotePid, flag: Symbol, set: boolean) = synchronized {
+ def processFlag(pid: RemotePid, flag: Symbol, set: Boolean) = synchronized {
if (flag.name.equals("trapExit")) {
if (trapExits.contains(pid.localId) && !set)
trapExits -= pid.localId
@@ -437,7 +431,7 @@ class NetKernel(service: Service) {
}
// assume from.node == this.node
- private def unlinkFromLocal(from: RemotePid, to: RemotePid): unit =
+ private def unlinkFromLocal(from: RemotePid, to: RemotePid): Unit =
links.get(from.localId) match {
case None =>
// has no links -> ignore
@@ -450,7 +444,7 @@ class NetKernel(service: Service) {
unlinks bi-directional link
assume from.node == this.node
*/
- def unlink(from: RemotePid, to: RemotePid): unit = synchronized {
+ def unlink(from: RemotePid, to: RemotePid): Unit = synchronized {
unlinkFromLocal(from, to)
if (to.node == this.node)
unlinkFromLocal(to, from)
@@ -461,7 +455,7 @@ class NetKernel(service: Service) {
}
// assume from.node == this.node
- private def linkFromLocal(from: RemotePid, to: RemotePid): unit =
+ private def linkFromLocal(from: RemotePid, to: RemotePid): Unit =
// TODO: send Exit to from if to is invalid
links.get(from.localId) match {
case None =>
@@ -504,7 +498,7 @@ class NetKernel(service: Service) {
Assume pid is local.
*/
- def exit(pid: RemotePid, reason: Symbol): unit = synchronized {
+ def exit(pid: RemotePid, reason: Symbol): Unit = synchronized {
if (!(exitMarks contains pid)) {
exitMarks += pid // mark pid as exiting
//Console.println("" + pid + " is exiting (" + reason + ").")
@@ -548,9 +542,9 @@ class NetKernel(service: Service) {
}
private val monNodes =
- new HashMap[Node,HashMap[RemotePid,int]];
+ new HashMap[Node,HashMap[RemotePid,Int]]
- def monitorNode(client: RemotePid, mnode: Node, cond: boolean) = synchronized {
+ def monitorNode(client: RemotePid, mnode: Node, cond: Boolean) = synchronized {
monNodes.get(mnode) match {
case None =>
// nobody is monitoring this node
@@ -580,7 +574,7 @@ class NetKernel(service: Service) {
}
}
- def nodeDown(mnode: Node) = {
+ def nodeDown(mnode: Node) =
// send NodeDown msg to registered RemotePids
monNodes.get(mnode) match {
case None =>
@@ -596,6 +590,5 @@ class NetKernel(service: Service) {
}
}
}
- }
}
diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala
index e12d4046af..51e6c6690a 100644
--- a/src/actors/scala/actors/distributed/RemoteActor.scala
+++ b/src/actors/scala/actors/distributed/RemoteActor.scala
@@ -21,6 +21,7 @@ case class TCP() extends ServiceName
* @author Philipp Haller
*/
class RemoteActor extends Actor {
+
override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
// locality check (handler local to this actor?)
if (destDesc.pid == self)
@@ -29,7 +30,7 @@ class RemoteActor extends Actor {
kernel.forwardExc(destDesc, e)
}
- override def receive(f: PartialFunction[Message,unit]): scala.All = {
+ override def receive(f: PartialFunction[Message,Unit]): scala.All = {
if (isAlive) {
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
@@ -55,10 +56,9 @@ class RemoteActor extends Actor {
throw new Done
}
- var kernel: NetKernel = null;
+ var kernel: NetKernel = null
- def node =
- self.node
+ def node = self.node
def nodes = kernel.nodes
@@ -71,9 +71,9 @@ class RemoteActor extends Actor {
}
def serialize(index: String, rep: Serializer => AnyRef) =
- kernel.registerSerializer(index, rep);
+ kernel.registerSerializer(index, rep)
- def alive(s: ServiceName): unit = {
+ def alive(s: ServiceName): Unit = {
var service: Service = null
s match {
case TCP() =>
@@ -96,7 +96,8 @@ class RemoteActor extends Actor {
}
service = serv
serv.start()*/
- case _ => throw new Exception ("Unknown Service in RemoteActor")
+ case _ =>
+ throw new Exception ("Unknown Service in RemoteActor")
}
// create RemotePid
selfCached = service.kernel.register(this)
@@ -108,26 +109,27 @@ class RemoteActor extends Actor {
}
def disconnectNode(node: Node) =
- kernel.disconnectNode(node);
+ kernel.disconnectNode(node)
+
//does not call start def of Actor
- def register(name: Symbol, pid: RemotePid): unit =
- kernel.registerName(name, pid);
+ def register(name: Symbol, pid: RemotePid): Unit =
+ kernel.registerName(name, pid)
//calls start def of Actor
- def register(name: Symbol, a: RemoteActor): unit =
- kernel.registerName(name, a);
+ def register(name: Symbol, a: RemoteActor): Unit =
+ kernel.registerName(name, a)
def name(node: Node, sym: Symbol): Name =
Name(node, sym, kernel)
def spawn(node: Node, name: String): RemotePid =
- kernel.spawn(self, node, name);
+ kernel.spawn(self, node, name)
- def spawn(node: Node, a: RemoteActor): unit =
- kernel.spawn(self, node, a);
+ def spawn(node: Node, a: RemoteActor): Unit =
+ kernel.spawn(self, node, a)
- def spawn(fun: RemoteActor => unit): RemotePid =
- kernel.spawn(fun);
+ def spawn(fun: RemoteActor => Unit): RemotePid =
+ kernel.spawn(fun)
def spawn(a: RemoteActor): RemotePid = {
val pid = kernel.register(a)
@@ -138,23 +140,23 @@ class RemoteActor extends Actor {
def spawnLink(fun: RemoteActor => unit): RemotePid =
kernel.spawnLink(self, fun)
- def monitorNode(node: Node, cond: boolean) =
+ def monitorNode(node: Node, cond: Boolean) =
kernel.monitorNode(self, node, cond)
// this should be:
// self.link(pid)
// if self is RemotePid it will invoke NetKernel
- def link(pid: RemotePid): unit =
+ def link(pid: RemotePid): Unit =
kernel.link(self, pid)
- def unlink(pid: RemotePid): unit =
+ def unlink(pid: RemotePid): Unit =
kernel.unlink(self, pid)
- override def exit(reason: Symbol): unit =
+ override def exit(reason: Symbol): Unit =
kernel.exit(self, reason)
- override def processFlag(flag: Symbol, set: boolean) =
+ override def processFlag(flag: Symbol, set: Boolean) =
kernel.processFlag(self, flag, set)
override def die(reason: Symbol) =
diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
index 8963a82751..7c7ab87eee 100644
--- a/src/actors/scala/actors/distributed/TcpSerializerComb.scala
+++ b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
@@ -10,14 +10,13 @@
package scala.actors.distributed
-import java.io._
-import scala.collection.mutable._
+import java.io.Reader
import scala.actors.distributed.picklers.BytePickle._
-
import scala.actors.distributed.MessagesComb._
import scala.actors.distributed.NodeComb._
-import scala.actors.multi._
+import scala.actors.multi.Pid
+import scala.collection.mutable.HashMap
/**
* @author Philipp Haller
@@ -96,26 +95,26 @@ class TcpSerializerComb(serv: Service) extends Serializer(serv) {
}
def addRep(name: String, repCons: Serializer => AnyRef) =
- table.update(name, repCons(this));
+ table.update(name, repCons(this))
def +=(name: String) =
- new InternalMapTo(name);
+ new InternalMapTo(name)
class InternalMapTo(name: String) {
def ->(repCons: Serializer => AnyRef): unit =
- table.update(name, repCons(TcpSerializerComb.this));
+ table.update(name, repCons(TcpSerializerComb.this))
}
def serialize(o: AnyRef): Array[byte] = {
- log.info("pickling value of type " + Util.baseName(o));
- val op = table.get(Util.baseName(o));
+ log.info("pickling value of type " + Util.baseName(o))
+ val op = table.get(Util.baseName(o))
op match {
case None => error("No type representation for " + Util.baseName(o) + " found. Cannot serialize.");
case Some(rep) =>
// first write type name
val bytes = pickle(string, Util.baseName(o))
- val repr = rep.asInstanceOf[SPU[AnyRef]];
- log.info("using type representation " + repr);
+ val repr = rep.asInstanceOf[SPU[AnyRef]]
+ log.info("using type representation " + repr)
val res = repr.appP(o, new PicklerState(bytes, new PicklerEnv)).stream
res
}
diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala
index 6d2c5dfa19..a1efa01d9b 100644
--- a/src/actors/scala/actors/distributed/TcpService.scala
+++ b/src/actors/scala/actors/distributed/TcpService.scala
@@ -10,16 +10,14 @@
package scala.actors.distributed
-import java.net._
-import java.io._
-
-import java.util.logging._
+import java.io.IOException
+import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException}
/**
* @author Philipp Haller
*/
object TcpService {
- val random = new java.util.Random(0)
+ val random = new java.util.Random(System.currentTimeMillis())
def generatePort: int = {
var portnum = 0
@@ -41,8 +39,8 @@ object TcpService {
}
object TestPorts {
- def main(args: Array[String]): unit = {
- val random = new java.util.Random(0)
+ def main(args: Array[String]): Unit = {
+ val random = new java.util.Random(System.currentTimeMillis())
val socket = new ServerSocket(8000 + random.nextInt(500))
Console.println(TcpService.generatePort)
}
@@ -92,7 +90,7 @@ class TcpService(port: Int) extends Thread with Service {
}
}
- override def run(): unit = {
+ override def run(): Unit =
try {
val socket = new ServerSocket(port);
Console.println("Tcp Service started: " + node);
@@ -113,20 +111,19 @@ class TcpService(port: Int) extends Thread with Service {
}
}
catch {
- case ioe: IOException => {
+ case ioe: IOException =>
// do nothing
- }
- case sec: SecurityException => {
+ case sec: SecurityException =>
// do nothing
- }
}
- }
// connection management
- private val connections = new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker];
+ private val connections =
+ new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker]
- def nodes:List[Node] = throw new Exception ("nodes need to be implemented in TcpService!")
+ def nodes: List[Node] =
+ throw new Exception ("nodes need to be implemented in TcpService!")
def addConnection(n: TcpNode, w: TcpServiceWorker) = synchronized {
connections += n -> w
@@ -136,18 +133,16 @@ class TcpService(port: Int) extends Thread with Service {
connections.get(n)
}
- def isConnected(n: Node): boolean = synchronized {
+ def isConnected(n: Node): Boolean = synchronized {
n match {
case tnode: TcpNode =>
- connections.get(tnode) match {
- case None => false
- case Some(x) => true
- }
- case _ => false
+ ! connections.get(tnode).isEmpty
+ case _ =>
+ false
}
}
- def connect(n: Node): unit = synchronized {
+ def connect(n: Node): Unit = synchronized {
n match {
case tnode: TcpNode =>
connect(tnode)
@@ -202,9 +197,9 @@ class TcpService(port: Int) extends Thread with Service {
false
}
- def getRoundTripTimeMillis(node: Node): long = 0
+ def getRoundTripTimeMillis(node: Node): Long = 0
- def nodeDown(mnode: TcpNode): unit = synchronized {
+ def nodeDown(mnode: TcpNode): Unit = synchronized {
kernel nodeDown mnode
connections -= mnode
}
diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
index 64bc73059e..78190c7d1c 100644
--- a/src/actors/scala/actors/distributed/TcpServiceWorker.scala
+++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
@@ -10,8 +10,8 @@
package scala.actors.distributed
-import java.net._
import java.io._
+import java.net.Socket
/**
* @author Philipp Haller
@@ -29,18 +29,18 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val log = new Debug("TcpServiceWorker")
log.level = 2
- def transmit(msg: Send): unit = synchronized {
+ def transmit(msg: Send): Unit = synchronized {
val data = parent.serializer.serialize(msg)
transmit(data)
}
- def transmit(data: String): unit = synchronized {
+ def transmit(data: String): Unit = synchronized {
log.info("Transmitting " + data)
writer.write(data)
writer.flush()
}
- def transmit(data: Array[byte]): unit = synchronized {
+ def transmit(data: Array[byte]): Unit = synchronized {
log.info("Transmitting " + data)
dataout.writeInt(data.length)
dataout.write(data)
diff --git a/src/actors/scala/actors/distributed/Util.scala b/src/actors/scala/actors/distributed/Util.scala
index 82f29d0c19..07ca4fb417 100644
--- a/src/actors/scala/actors/distributed/Util.scala
+++ b/src/actors/scala/actors/distributed/Util.scala
@@ -10,8 +10,7 @@
package scala.actors.distributed
-import java.io._
-import scala.collection.mutable._
+import scala.collection.mutable.{ArrayBuffer,Buffer}
/**
* @author Philipp Haller