diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-12 19:40:37 +0200 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-12 19:40:37 +0200 |
commit | e17ed9a21d47f37ff409afa917e3891a7372be86 (patch) | |
tree | 2b5439ed06dd89f1e28e6a68741fa9e320455af6 /core | |
parent | ad8a7612a471a247126e736933b6271c03fd2a18 (diff) | |
download | spark-e17ed9a21d47f37ff409afa917e3891a7372be86.tar.gz spark-e17ed9a21d47f37ff409afa917e3891a7372be86.tar.bz2 spark-e17ed9a21d47f37ff409afa917e3891a7372be86.zip |
Switch to Akka futures in connection manager.
It's still not good because each Future ends up waiting on a lock, but
it seems to work better than Scala Actors, and more importantly it
allows us to use onComplete and other listeners on futures.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/network/ConnectionManager.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 13 |
2 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index f680a6419b..03fad4dc05 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -2,20 +2,19 @@ package spark.network import spark._ -import scala.actors.Future -import scala.actors.Futures.future +import java.nio._ +import java.nio.channels._ +import java.nio.channels.spi._ +import java.net._ +import java.util.concurrent.Executors + import scala.collection.mutable.HashMap import scala.collection.mutable.SynchronizedMap import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer -import java.io._ -import java.nio._ -import java.nio.channels._ -import java.nio.channels.spi._ -import java.net._ -import java.util.concurrent.Executors +import akka.dispatch.{ExecutionContext, Future} case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) @@ -44,6 +43,9 @@ class ConnectionManager(port: Int) extends Logging { val connectionRequests = new SynchronizedQueue[SendingConnection] val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val sendMessageRequests = new Queue[(Message, SendingConnection)] + + implicit val futureExecContext = ExecutionContext.fromExecutor( + Executors.newCachedThreadPool(DaemonThreadFactory)) var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null @@ -312,9 +314,9 @@ class ConnectionManager(port: Int) extends Logging { messageStatuses += ((message.id, messageStatus)) } sendMessage(connectionManagerId, message) - future { + Future { messageStatus.synchronized { - if (!messageStatus.attempted) { + while (!messageStatus.attempted) { logTrace("Waiting, " + messageStatuses.size + " statuses" ) messageStatus.wait() logTrace("Done waiting") diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index b79addb6c8..c1f3d3294a 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -9,12 +9,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.LinkedBlockingQueue import java.util.Collections -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.Future -import scala.actors.Futures.future -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ +import akka.dispatch.{Await, Future} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ @@ -30,6 +25,7 @@ import spark.SparkException import spark.Utils import spark.util.ByteBufferInputStream import spark.network._ +import akka.util.Duration class BlockManagerId(var ip: String, var port: Int) extends Externalizable { def this() = this(null, 0) @@ -81,6 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = new ConnectionManager(0) + implicit val futureExecContext = connectionManager.futureExecContext val connectionManagerId = connectionManager.id val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) @@ -439,7 +436,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Initiate the replication before storing it locally. This is faster as // data is already serialized and ready for sending val replicationFuture = if (level.replication > 1) { - future { + Future { replicate(blockId, bytes, level) } } else { @@ -475,7 +472,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (replicationFuture == null) { throw new Exception("Unexpected") } - replicationFuture() + Await.ready(replicationFuture, Duration.Inf) } val finishTime = System.currentTimeMillis |