aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 19:40:37 +0200
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 19:40:37 +0200
commite17ed9a21d47f37ff409afa917e3891a7372be86 (patch)
tree2b5439ed06dd89f1e28e6a68741fa9e320455af6 /core
parentad8a7612a471a247126e736933b6271c03fd2a18 (diff)
downloadspark-e17ed9a21d47f37ff409afa917e3891a7372be86.tar.gz
spark-e17ed9a21d47f37ff409afa917e3891a7372be86.tar.bz2
spark-e17ed9a21d47f37ff409afa917e3891a7372be86.zip
Switch to Akka futures in connection manager.
It's still not good because each Future ends up waiting on a lock, but it seems to work better than Scala Actors, and more importantly it allows us to use onComplete and other listeners on futures.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala13
2 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index f680a6419b..03fad4dc05 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -2,20 +2,19 @@ package spark.network
import spark._
-import scala.actors.Future
-import scala.actors.Futures.future
+import java.nio._
+import java.nio.channels._
+import java.nio.channels.spi._
+import java.net._
+import java.util.concurrent.Executors
+
import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.nio.channels.spi._
-import java.net._
-import java.util.concurrent.Executors
+import akka.dispatch.{ExecutionContext, Future}
case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
@@ -44,6 +43,9 @@ class ConnectionManager(port: Int) extends Logging {
val connectionRequests = new SynchronizedQueue[SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
+
+ implicit val futureExecContext = ExecutionContext.fromExecutor(
+ Executors.newCachedThreadPool(DaemonThreadFactory))
var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
@@ -312,9 +314,9 @@ class ConnectionManager(port: Int) extends Logging {
messageStatuses += ((message.id, messageStatus))
}
sendMessage(connectionManagerId, message)
- future {
+ Future {
messageStatus.synchronized {
- if (!messageStatus.attempted) {
+ while (!messageStatus.attempted) {
logTrace("Waiting, " + messageStatuses.size + " statuses" )
messageStatus.wait()
logTrace("Done waiting")
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index b79addb6c8..c1f3d3294a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -9,12 +9,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.Future
-import scala.actors.Futures.future
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
+import akka.dispatch.{Await, Future}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
@@ -30,6 +25,7 @@ import spark.SparkException
import spark.Utils
import spark.util.ByteBufferInputStream
import spark.network._
+import akka.util.Duration
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0)
@@ -81,6 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0)
+ implicit val futureExecContext = connectionManager.futureExecContext
val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
@@ -439,7 +436,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) {
- future {
+ Future {
replicate(blockId, bytes, level)
}
} else {
@@ -475,7 +472,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (replicationFuture == null) {
throw new Exception("Unexpected")
}
- replicationFuture()
+ Await.ready(replicationFuture, Duration.Inf)
}
val finishTime = System.currentTimeMillis