aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala49
1 files changed, 43 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e8bbd298c6..e67676950b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -25,16 +25,19 @@ import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
-import akka.actor.{ActorSystem, Cancellable, Props}
+import akka.actor.{ActorSystem, Props}
import sun.nio.ch.DirectBuffer
import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
+import org.apache.spark.network.netty.client.BlockFetchingClientFactory
+import org.apache.spark.network.netty.server.BlockServer
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._
+
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
@@ -58,7 +61,7 @@ private[spark] class BlockManager(
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker)
- extends Logging {
+ extends BlockDataProvider with Logging {
private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this)
@@ -86,13 +89,25 @@ private[spark] class BlockManager(
new TachyonStore(this, tachyonBlockManager)
}
+ private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
+
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private val nettyPort: Int = {
- val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
- val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
- if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
+ private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
+ if (useNetty) new BlockFetchingClientFactory(conf) else null
}
+ private val nettyBlockServer: BlockServer = {
+ if (useNetty) {
+ val server = new BlockServer(conf, this)
+ logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
+ server
+ } else {
+ null
+ }
+ }
+
+ private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
+
val blockManagerId = BlockManagerId(
executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
@@ -216,6 +231,20 @@ private[spark] class BlockManager(
}
}
+ override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = {
+ val bid = BlockId(blockId)
+ if (bid.isShuffle) {
+ Left(diskBlockManager.getBlockLocation(bid))
+ } else {
+ val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
+ if (blockBytesOpt.isDefined) {
+ Right(blockBytesOpt.get)
+ } else {
+ throw new BlockNotFoundException(blockId)
+ }
+ }
+ }
+
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
@@ -1061,6 +1090,14 @@ private[spark] class BlockManager(
connectionManager.stop()
shuffleBlockManager.stop()
diskBlockManager.stop()
+
+ if (nettyBlockClientFactory != null) {
+ nettyBlockClientFactory.stop()
+ }
+ if (nettyBlockServer != null) {
+ nettyBlockServer.stop()
+ }
+
actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()