aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-28 14:08:07 -0700
committerReynold Xin <rxin@apache.org>2014-08-28 14:08:07 -0700
commitbe53c54b5c685e1d04d49bd554e05029a5a106e1 (patch)
tree9ca20f367ebcb6176453321cbf0617c31843697e /core/src/main
parent41dc5987d9abeca6fc0f5935c780d48f517cdf95 (diff)
downloadspark-be53c54b5c685e1d04d49bd554e05029a5a106e1.tar.gz
spark-be53c54b5c685e1d04d49bd554e05029a5a106e1.tar.bz2
spark-be53c54b5c685e1d04d49bd554e05029a5a106e1.zip
[SPARK-3281] Remove Netty specific code in BlockManager / shuffle
Netty functionality will be added back in subsequent PRs by using the BlockTransferService interface. Author: Reynold Xin <rxin@apache.org> Closes #2181 from rxin/SPARK-3281 and squashes the following commits: 5494b0e [Reynold Xin] Fix extra port. ff6d1e1 [Reynold Xin] [SPARK-3281] Remove Netty specific code in BlockManager.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
4 files changed, 13 insertions, 127 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ca60ec78b6..4ab8ec8f0f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -33,16 +33,8 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
/**
- * A block fetcher iterator interface. There are two implementations:
- *
- * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
- * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
- *
- * Eventually we would like the two to converge and use a single NIO-based communication layer,
- * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
- * NIO would perform poorly and thus the need for the Netty OIO one.
+ * A block fetcher iterator interface for fetching shuffle blocks.
*/
-
private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
@@ -262,67 +254,4 @@ object BlockFetcherIterator {
}
}
// End of BasicBlockFetcherIterator
-
- class NettyBlockFetcherIterator(
- blockManager: BlockManager,
- blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
- serializer: Serializer,
- readMetrics: ShuffleReadMetrics)
- extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) {
-
- override protected def sendRequest(req: FetchRequest) {
- logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
- val cmId = new ConnectionManagerId(req.address.host, req.address.port)
-
- bytesInFlight += req.size
- val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
-
- // This could throw a TimeoutException. In that case we will just retry the task.
- val client = blockManager.nettyBlockClientFactory.createClient(
- cmId.host, req.address.nettyPort)
- val blocks = req.blocks.map(_._1.toString)
-
- client.fetchBlocks(
- blocks,
- new BlockClientListener {
- override def onFetchFailure(blockId: String, errorMsg: String): Unit = {
- logError(s"Could not get block(s) from $cmId with error: $errorMsg")
- for ((blockId, size) <- req.blocks) {
- results.put(new FetchResult(blockId, -1, null))
- }
- }
-
- override def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit = {
- // Increment the reference count so the buffer won't be recycled.
- // TODO: This could result in memory leaks when the task is stopped due to exception
- // before the iterator is exhausted.
- data.retain()
- val buf = data.byteBuffer()
- val blockSize = buf.remaining()
- val bid = BlockId(blockId)
-
- // TODO: remove code duplication between here and BlockManager.dataDeserialization.
- results.put(new FetchResult(bid, sizeMap(bid), () => {
- def createIterator: Iterator[Any] = {
- val stream = blockManager.wrapForCompression(bid, data.inputStream())
- serializer.newInstance().deserializeStream(stream).asIterator
- }
- new LazyInitIterator(createIterator) {
- // Release the buffer when we are done traversing it.
- override def close(): Unit = data.release()
- }
- }))
-
- readMetrics.synchronized {
- readMetrics.remoteBytesRead += blockSize
- readMetrics.remoteBlocksFetched += 1
- }
- logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
- }
- }
- )
- }
- }
- // End of NettyBlockFetcherIterator
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 12a92d44f4..1eb622c12a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -32,8 +32,6 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
-import org.apache.spark.network.netty.client.BlockFetchingClientFactory
-import org.apache.spark.network.netty.server.BlockServer
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
@@ -90,27 +88,8 @@ private[spark] class BlockManager(
new TachyonStore(this, tachyonBlockManager)
}
- private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
-
- // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
- if (useNetty) new BlockFetchingClientFactory(conf) else null
- }
-
- private val nettyBlockServer: BlockServer = {
- if (useNetty) {
- val server = new BlockServer(conf, this)
- logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
- server
- } else {
- null
- }
- }
-
- private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
-
val blockManagerId = BlockManagerId(
- executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
+ executorId, connectionManager.id.host, connectionManager.id.port)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@@ -572,14 +551,8 @@ private[spark] class BlockManager(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
- val iter =
- if (conf.getBoolean("spark.shuffle.use.netty", false)) {
- new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer,
- readMetrics)
- } else {
- new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
- readMetrics)
- }
+ val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
+ readMetrics)
iter.initialize()
iter
}
@@ -1092,14 +1065,6 @@ private[spark] class BlockManager(
connectionManager.stop()
shuffleBlockManager.stop()
diskBlockManager.stop()
-
- if (nettyBlockClientFactory != null) {
- nettyBlockClientFactory.stop()
- }
- if (nettyBlockServer != null) {
- nettyBlockServer.stop()
- }
-
actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b1585bd819..b7bcb2d85d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -36,11 +36,10 @@ import org.apache.spark.util.Utils
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
- private var port_ : Int,
- private var nettyPort_ : Int
+ private var port_ : Int
) extends Externalizable {
- private def this() = this(null, null, 0, 0) // For deserialization only
+ private def this() = this(null, null, 0) // For deserialization only
def executorId: String = executorId_
@@ -60,32 +59,28 @@ class BlockManagerId private (
def port: Int = port_
- def nettyPort: Int = nettyPort_
-
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
- out.writeInt(nettyPort_)
}
override def readExternal(in: ObjectInput) {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
- nettyPort_ = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
+ override def toString = s"BlockManagerId($executorId, $host, $port)"
- override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
+ override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
override def equals(that: Any) = that match {
case id: BlockManagerId =>
- executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
+ executorId == id.executorId && port == id.port && host == id.host
case _ =>
false
}
@@ -100,11 +95,10 @@ private[spark] object BlockManagerId {
* @param execId ID of the executor.
* @param host Host name of the block manager.
* @param port Port of the block manager.
- * @param nettyPort Optional port for the Netty-based shuffle sender.
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
- def apply(execId: String, host: String, port: Int, nettyPort: Int) =
- getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
+ def apply(execId: String, host: String, port: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, host, port))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index db7384705f..a7543454ec 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -295,8 +295,7 @@ private[spark] object JsonProtocol {
def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
("Executor ID" -> blockManagerId.executorId) ~
("Host" -> blockManagerId.host) ~
- ("Port" -> blockManagerId.port) ~
- ("Netty Port" -> blockManagerId.nettyPort)
+ ("Port" -> blockManagerId.port)
}
def jobResultToJson(jobResult: JobResult): JValue = {
@@ -644,8 +643,7 @@ private[spark] object JsonProtocol {
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val port = (json \ "Port").extract[Int]
- val nettyPort = (json \ "Netty Port").extract[Int]
- BlockManagerId(executorId, host, port, nettyPort)
+ BlockManagerId(executorId, host, port)
}
def jobResultFromJson(json: JValue): JobResult = {