aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-01 14:37:45 -0700
committerReynold Xin <rxin@databricks.com>2014-11-01 14:37:45 -0700
commitf55218aeb1e9d638df6229b36a59a15ce5363482 (patch)
tree84e4454c224b3f14b7fcbe8259c90d06b6fd969b /core
parent1d4f3552037cb667971bea2e5078d8b3ce6c2eae (diff)
downloadspark-f55218aeb1e9d638df6229b36a59a15ce5363482.tar.gz
spark-f55218aeb1e9d638df6229b36a59a15ce5363482.tar.bz2
spark-f55218aeb1e9d638df6229b36a59a15ce5363482.zip
[SPARK-3796] Create external service which can serve shuffle files
This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager). This PR continues the work from #2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN NodeManager, and do not wish to include Spark's dependencies (including Scala itself). There are several outstanding tasks which must be complete before this PR can be merged: - [x] Complete unit testing of network/shuffle package. - [x] Performance and correctness testing on a real cluster. - [x] Remove example service instantiation from Worker.scala. There are even more shortcomings of this PR which should be addressed in followup patches: - Don't use Java serializer for RPC layer! It is not cross-version compatible. - Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers. - Documentation of the feature in the Spark docs. - Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine). - SSL and SASL integration - Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation). Author: Aaron Davidson <aaron@databricks.com> Closes #3001 from aarondav/shuffle-service and squashes the following commits: 4d1f8c1 [Aaron Davidson] Remove changes to Worker 705748f [Aaron Davidson] Rename Standalone* to External* fd3928b [Aaron Davidson] Do not unregister executor outputs unduly 9883918 [Aaron Davidson] Make suggested build changes 3d62679 [Aaron Davidson] Add Spark integration test 7fe51d5 [Aaron Davidson] Fix SBT integration 56caa50 [Aaron Davidson] Address comments c8d1ac3 [Aaron Davidson] Add unit tests 2f70c0c [Aaron Davidson] Fix unit tests 5483e96 [Aaron Davidson] Fix unit tests 46a70bf [Aaron Davidson] Whoops, bracket 5ea4df6 [Aaron Davidson] [SPARK-3796] Create external service which can serve shuffle files
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala95
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala (renamed from core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala)30
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala25
32 files changed, 327 insertions, 272 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 6963ce4777..41296e0eca 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -50,6 +50,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4cb0bd4142..7d96962c4a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -178,6 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
+ logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
@@ -348,7 +349,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
new ConcurrentHashMap[Int, Array[MapStatus]]
}
-private[spark] object MapOutputTracker {
+private[spark] object MapOutputTracker extends Logging {
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -381,6 +382,7 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
+ logError("Missing an output location for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 16c5d6648d..e2f13accdf 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -32,7 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
-import org.apache.spark.network.netty.{NettyBlockTransferService}
+import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c4a8ec2e5e..f1f66d0903 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -186,11 +186,11 @@ private[spark] class Worker(
private def retryConnectToMaster() {
Utils.tryOrExit {
connectionAttemptCount += 1
- logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
if (registered) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
+ logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
tryRegisterAllMasters()
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel())
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2889f59e33..c78e0ffca2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -78,7 +78,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
- conf.set("spark.executor.id", "executor." + executorId)
+ conf.set("spark.executor.id", executorId)
private val env = {
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index b083f46533..210a581db4 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -20,16 +20,16 @@ package org.apache.spark.network
import java.io.Closeable
import java.nio.ByteBuffer
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Promise, Await, Future}
import scala.concurrent.duration.Duration
import org.apache.spark.Logging
import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.util.Utils
+import org.apache.spark.network.shuffle.{ShuffleClient, BlockFetchingListener}
+import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}
private[spark]
-abstract class BlockTransferService extends Closeable with Logging {
+abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -60,10 +60,11 @@ abstract class BlockTransferService extends Closeable with Logging {
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
*/
- def fetchBlocks(
- hostName: String,
+ override def fetchBlocks(
+ host: String,
port: Int,
- blockIds: Seq[String],
+ execId: String,
+ blockIds: Array[String],
listener: BlockFetchingListener): Unit
/**
@@ -81,43 +82,23 @@ abstract class BlockTransferService extends Closeable with Logging {
*
* It is also only available after [[init]] is invoked.
*/
- def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
+ def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
// A monitor for the thread to wait on.
- val lock = new Object
- @volatile var result: Either[ManagedBuffer, Throwable] = null
- fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
- override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
- lock.synchronized {
- result = Right(exception)
- lock.notify()
+ val result = Promise[ManagedBuffer]()
+ fetchBlocks(host, port, execId, Array(blockId),
+ new BlockFetchingListener {
+ override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
+ result.failure(exception)
}
- }
- override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
- lock.synchronized {
+ override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
- result = Left(new NioManagedBuffer(ret))
- lock.notify()
+ result.success(new NioManagedBuffer(ret))
}
- }
- })
+ })
- // Sleep until result is no longer null
- lock.synchronized {
- while (result == null) {
- try {
- lock.wait()
- } catch {
- case e: InterruptedException =>
- }
- }
- }
-
- result match {
- case Left(data) => data
- case Right(e) => throw e
- }
+ Await.result(result.future, Duration.Inf)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
deleted file mode 100644
index 8c5ffd8da6..0000000000
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.nio.ByteBuffer
-import java.util
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.network.BlockFetchingListener
-import org.apache.spark.network.netty.NettyMessages._
-import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.network.client.{RpcResponseCallback, ChunkReceivedCallback, TransportClient}
-import org.apache.spark.storage.BlockId
-import org.apache.spark.util.Utils
-
-/**
- * Responsible for holding the state for a request for a single set of blocks. This assumes that
- * the chunks will be returned in the same order as requested, and that there will be exactly
- * one chunk per block.
- *
- * Upon receipt of any block, the listener will be called back. Upon failure part way through,
- * the listener will receive a failure callback for each outstanding block.
- */
-class NettyBlockFetcher(
- serializer: Serializer,
- client: TransportClient,
- blockIds: Seq[String],
- listener: BlockFetchingListener)
- extends Logging {
-
- require(blockIds.nonEmpty)
-
- private val ser = serializer.newInstance()
-
- private var streamHandle: ShuffleStreamHandle = _
-
- private val chunkCallback = new ChunkReceivedCallback {
- // On receipt of a chunk, pass it upwards as a block.
- def onSuccess(chunkIndex: Int, buffer: ManagedBuffer): Unit = Utils.logUncaughtExceptions {
- listener.onBlockFetchSuccess(blockIds(chunkIndex), buffer)
- }
-
- // On receipt of a failure, fail every block from chunkIndex onwards.
- def onFailure(chunkIndex: Int, e: Throwable): Unit = {
- blockIds.drop(chunkIndex).foreach { blockId =>
- listener.onBlockFetchFailure(blockId, e);
- }
- }
- }
-
- /** Begins the fetching process, calling the listener with every block fetched. */
- def start(): Unit = {
- // Send the RPC to open the given set of blocks. This will return a ShuffleStreamHandle.
- client.sendRpc(ser.serialize(OpenBlocks(blockIds.map(BlockId.apply))).array(),
- new RpcResponseCallback {
- override def onSuccess(response: Array[Byte]): Unit = {
- try {
- streamHandle = ser.deserialize[ShuffleStreamHandle](ByteBuffer.wrap(response))
- logTrace(s"Successfully opened block set: $streamHandle! Preparing to fetch chunks.")
-
- // Immediately request all chunks -- we expect that the total size of the request is
- // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
- for (i <- 0 until streamHandle.numChunks) {
- client.fetchChunk(streamHandle.streamId, i, chunkCallback)
- }
- } catch {
- case e: Exception =>
- logError("Failed while starting block fetches", e)
- blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
- }
- }
-
- override def onFailure(e: Throwable): Unit = {
- logError("Failed while starting block fetches", e)
- blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
- }
- })
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 02c657e1d6..1950e7bd63 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -19,39 +19,41 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+
import org.apache.spark.Logging
import org.apache.spark.network.BlockDataManager
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
+import org.apache.spark.network.shuffle.ShuffleStreamHandle
import org.apache.spark.serializer.Serializer
-import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.client.{TransportClient, RpcResponseCallback}
-import org.apache.spark.network.server.{DefaultStreamManager, RpcHandler}
-import org.apache.spark.storage.{StorageLevel, BlockId}
-
-import scala.collection.JavaConversions._
+import org.apache.spark.storage.{BlockId, StorageLevel}
object NettyMessages {
-
/** Request to read a set of blocks. Returns [[ShuffleStreamHandle]] to identify the stream. */
case class OpenBlocks(blockIds: Seq[BlockId])
/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
case class UploadBlock(blockId: BlockId, blockData: Array[Byte], level: StorageLevel)
-
- /** Identifier for a fixed number of chunks to read from a stream created by [[OpenBlocks]]. */
- case class ShuffleStreamHandle(streamId: Long, numChunks: Int)
}
/**
* Serves requests to open blocks by simply registering one chunk per block requested.
+ * Handles opening and uploading arbitrary BlockManager blocks.
+ *
+ * Opened blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
+ * is equivalent to one Spark-level shuffle block.
*/
class NettyBlockRpcServer(
serializer: Serializer,
- streamManager: DefaultStreamManager,
blockManager: BlockDataManager)
extends RpcHandler with Logging {
import NettyMessages._
+ private val streamManager = new OneForOneStreamManager()
+
override def receive(
client: TransportClient,
messageBytes: Array[Byte],
@@ -73,4 +75,6 @@ class NettyBlockRpcServer(
responseContext.onSuccess(new Array[Byte](0))
}
}
+
+ override def getStreamManager(): StreamManager = streamManager
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 38a3e94515..ec3000e722 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,15 +17,15 @@
package org.apache.spark.network.netty
-import scala.concurrent.{Promise, Future}
+import scala.concurrent.{Future, Promise}
import org.apache.spark.SparkConf
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory}
-import org.apache.spark.network.netty.NettyMessages.UploadBlock
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClientFactory}
+import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
import org.apache.spark.network.server._
-import org.apache.spark.network.util.{ConfigProvider, TransportConf}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
@@ -37,30 +37,29 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
val serializer = new JavaSerializer(conf)
- // Create a TransportConfig using SparkConf.
- private[this] val transportConf = new TransportConf(
- new ConfigProvider { override def get(name: String) = conf.get(name) })
-
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
private[this] var clientFactory: TransportClientFactory = _
override def init(blockDataManager: BlockDataManager): Unit = {
- val streamManager = new DefaultStreamManager
- val rpcHandler = new NettyBlockRpcServer(serializer, streamManager, blockDataManager)
- transportContext = new TransportContext(transportConf, streamManager, rpcHandler)
+ val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
+ transportContext = new TransportContext(SparkTransportConf.fromSparkConf(conf), rpcHandler)
clientFactory = transportContext.createClientFactory()
server = transportContext.createServer()
+ logInfo("Server created on " + server.getPort)
}
override def fetchBlocks(
- hostname: String,
+ host: String,
port: Int,
- blockIds: Seq[String],
+ execId: String,
+ blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
+ logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
- val client = clientFactory.createClient(hostname, port)
- new NettyBlockFetcher(serializer, client, blockIds, listener).start()
+ val client = clientFactory.createClient(host, port)
+ new OneForOneBlockFetcher(client, blockIds.toArray, listener)
+ .start(OpenBlocks(blockIds.map(BlockId.apply)))
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
diff --git a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 645793fde8..9fa4fa77b8 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -15,28 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.network
-
-import java.util.EventListener
-
-import org.apache.spark.network.buffer.ManagedBuffer
+package org.apache.spark.network.netty
+import org.apache.spark.SparkConf
+import org.apache.spark.network.util.{TransportConf, ConfigProvider}
/**
- * Listener callback interface for [[BlockTransferService.fetchBlocks]].
+ * Utility for creating a [[TransportConf]] from a [[SparkConf]].
*/
-private[spark]
-trait BlockFetchingListener extends EventListener {
-
- /**
- * Called once per successfully fetched block. After this call returns, data will be released
- * automatically. If the data will be passed to another thread, the receiver should retain()
- * and release() the buffer on their own, or copy the data to a new buffer.
- */
- def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
-
- /**
- * Called at least once per block upon failures.
- */
- def onBlockFetchFailure(blockId: String, exception: Throwable): Unit
+object SparkTransportConf {
+ def fromSparkConf(conf: SparkConf): TransportConf = {
+ new TransportConf(new ConfigProvider {
+ override def get(name: String): String = conf.get(name)
+ })
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index 11793ea92a..f56d165dab 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -79,13 +80,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
}
override def fetchBlocks(
- hostName: String,
+ host: String,
port: Int,
- blockIds: Seq[String],
+ execId: String,
+ blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
checkInit()
- val cmId = new ConnectionManagerId(hostName, port)
+ val cmId = new ConnectionManagerId(host, port)
val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
BlockMessage.fromGetBlock(GetBlock(BlockId(blockId)))
})
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f81fa6d808..af17b5d5d2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -124,6 +124,9 @@ class DAGScheduler(
/** If enabled, we may run certain actions like take() and first() locally. */
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
+ /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
+ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
@@ -1064,7 +1067,9 @@ class DAGScheduler(
runningStages -= failedStage
}
- if (failedStages.isEmpty && eventProcessActor != null) {
+ if (disallowStageRetryForTest) {
+ abortStage(failedStage, "Fetch failure will not retry stage due to testing config")
+ } else if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
@@ -1086,7 +1091,7 @@ class DAGScheduler(
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleExecutorLost(bmAddress.executorId, Some(task.epoch))
+ handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
}
case ExceptionFailure(className, description, stackTrace, metrics) =>
@@ -1106,25 +1111,35 @@ class DAGScheduler(
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
+ * We will also assume that we've lost all shuffle blocks associated with the executor if the
+ * executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
+ * occurred, in which case we presume all shuffle data related to this executor to be lost.
+ *
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+ private[scheduler] def handleExecutorLost(
+ execId: String,
+ fetchFailed: Boolean,
+ maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
- // TODO: This will be really slow if we keep accumulating shuffle map stages
- for ((shuffleId, stage) <- shuffleToMapStage) {
- stage.removeOutputsOnExecutor(execId)
- val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
- mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
- }
- if (shuffleToMapStage.isEmpty) {
- mapOutputTracker.incrementEpoch()
+
+ if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
+ // TODO: This will be really slow if we keep accumulating shuffle map stages
+ for ((shuffleId, stage) <- shuffleToMapStage) {
+ stage.removeOutputsOnExecutor(execId)
+ val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
+ mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
+ }
+ if (shuffleToMapStage.isEmpty) {
+ mapOutputTracker.incrementEpoch()
+ }
+ clearCacheLocs()
}
- clearCacheLocs()
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
@@ -1382,7 +1397,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
- dagScheduler.handleExecutorLost(execId)
+ dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 071568cdfb..cc13f57a49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -102,6 +102,11 @@ private[spark] class Stage(
}
}
+ /**
+ * Removes all shuffle outputs associated with this executor. Note that this will also remove
+ * outputs which are served by an external shuffle server (if one exists), as they are still
+ * registered with this execId.
+ */
def removeOutputsOnExecutor(execId: String) {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
@@ -131,4 +136,9 @@ private[spark] class Stage(
override def toString = "Stage " + id
override def hashCode(): Int = id
+
+ override def equals(other: Any): Boolean = other match {
+ case stage: Stage => stage != null && stage.id == id
+ case _ => false
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a6c23fc85a..376821f89c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -687,10 +687,11 @@ private[spark] class TaskSetManager(
addPendingTask(index, readding=true)
}
- // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage.
+ // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
+ // and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
- if (tasks(0).isInstanceOf[ShuffleMapTask]) {
+ if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index)) {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 1fb5b2c454..f03e8e4bf1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -62,7 +62,8 @@ private[spark] trait ShuffleWriterGroup {
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
-
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index e9805c9c13..a48f0c9ece 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -35,6 +35,8 @@ import org.apache.spark.storage._
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
*
*/
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager extends ShuffleBlockManager {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 6cf9305977..f49917b7fe 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -74,7 +74,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
- SparkEnv.get.blockTransferService,
+ SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 746ed33b54..183a30373b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -107,7 +107,7 @@ private[spark] class HashShuffleWriter[K, V](
writer.commitAndClose()
writer.fileSegment().length
}
- MapStatus(blockManager.blockManagerId, sizes)
+ MapStatus(blockManager.shuffleServerId, sizes)
}
private def revertWrites(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 927481b72c..d75f9d7311 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,7 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
- mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
+ mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
/** Close this writer, passing along whether the map completed */
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 8df5ec6bde..1f012941c8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -53,6 +53,8 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
def name = "rdd_" + rddId + "_" + splitIndex
}
+// Format of the shuffle block ids (including data and index) should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 58510d7232..1f8de28961 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,9 +21,9 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
import scala.util.Random
import akka.actor.{ActorSystem, Props}
@@ -34,8 +34,13 @@ import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.netty.{SparkTransportConf, NettyBlockTransferService}
+import org.apache.spark.network.shuffle.{ExecutorShuffleInfo, ExternalShuffleClient}
+import org.apache.spark.network.util.{ConfigProvider, TransportConf}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.util._
private[spark] sealed trait BlockValues
@@ -85,9 +90,38 @@ private[spark] class BlockManager(
new TachyonStore(this, tachyonBlockManager)
}
+ private[spark]
+ val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+ private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
+ // Check that we're not using external shuffle service with consolidated shuffle files.
+ if (externalShuffleServiceEnabled
+ && conf.getBoolean("spark.shuffle.consolidateFiles", false)
+ && shuffleManager.isInstanceOf[HashShuffleManager]) {
+ throw new UnsupportedOperationException("Cannot use external shuffle service with consolidated"
+ + " shuffle files in hash-based shuffle. Please disable spark.shuffle.consolidateFiles or "
+ + " switch to sort-based shuffle.")
+ }
+
val blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
+ // Address of the server that serves this executor's shuffle files. This is either an external
+ // service, or just our own Executor's BlockManager.
+ private[spark] val shuffleServerId = if (externalShuffleServiceEnabled) {
+ BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
+ } else {
+ blockManagerId
+ }
+
+ // Client to read other executors' shuffle files. This is either an external service, or just the
+ // standard BlockTranserService to directly connect to other Executors.
+ private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
+ val appId = conf.get("spark.app.id", "unknown-app-id")
+ new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), appId)
+ } else {
+ blockTransferService
+ }
+
// Whether to compress broadcast variables that are stored
private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
// Whether to compress shuffle output that are stored
@@ -143,10 +177,41 @@ private[spark] class BlockManager(
/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
- * BlockManagerWorker actor.
+ * BlockManagerWorker actor. Additionally registers with a local shuffle service if configured.
*/
private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
+
+ // Register Executors' configuration with the local shuffle service, if one should exist.
+ if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
+ registerWithExternalShuffleServer()
+ }
+ }
+
+ private def registerWithExternalShuffleServer() {
+ logInfo("Registering executor with local external shuffle service.")
+ val shuffleConfig = new ExecutorShuffleInfo(
+ diskBlockManager.localDirs.map(_.toString),
+ diskBlockManager.subDirsPerLocalDir,
+ shuffleManager.getClass.getName)
+
+ val MAX_ATTEMPTS = 3
+ val SLEEP_TIME_SECS = 5
+
+ for (i <- 1 to MAX_ATTEMPTS) {
+ try {
+ // Synchronous and will throw an exception if we cannot connect.
+ shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
+ shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
+ return
+ } catch {
+ case e: Exception if i < MAX_ATTEMPTS =>
+ val attemptsRemaining =
+ logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
+ + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
+ Thread.sleep(SLEEP_TIME_SECS * 1000)
+ }
+ }
}
/**
@@ -506,7 +571,7 @@ private[spark] class BlockManager(
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
val data = blockTransferService.fetchBlockSync(
- loc.host, loc.port, blockId.toString).nioByteBuffer()
+ loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
if (data != null) {
if (asBlockResult) {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 99e925328a..58fba54710 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,12 +38,13 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
- private val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+ private[spark]
+ val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
- val localDirs: Array[File] = createLocalDirs(conf)
+ private[spark] val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -52,6 +53,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
addShutdownHook()
+ /** Looks up a file by hashing it into one of our local subdirectories. */
+ // This method should be kept in sync with
+ // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
@@ -159,13 +163,16 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
- localDirs.foreach { localDir =>
- if (localDir.isDirectory() && localDir.exists()) {
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
- } catch {
- case e: Exception =>
- logError(s"Exception while deleting local spark dir: $localDir", e)
+ // Only perform cleanup if an external service is not serving our shuffle files.
+ if (!blockManager.externalShuffleServiceEnabled) {
+ localDirs.foreach { localDir =>
+ if (localDir.isDirectory() && localDir.exists()) {
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+ } catch {
+ case e: Exception =>
+ logError(s"Exception while deleting local spark dir: $localDir", e)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 0d6f3bf003..ee89c7e521 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{CompletionIterator, Utils}
@@ -38,8 +39,8 @@ import org.apache.spark.util.{CompletionIterator, Utils}
* using too much memory.
*
* @param context [[TaskContext]], used for metrics update
- * @param blockTransferService [[BlockTransferService]] for fetching remote blocks
- * @param blockManager [[BlockManager]] for reading local blocks
+ * @param shuffleClient [[ShuffleClient]] for fetching remote blocks
+ * @param blockManager [[BlockManager]] for reading local blocks
* @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
* For each block we also require the size (in bytes as a long field) in
* order to throttle the memory usage.
@@ -49,7 +50,7 @@ import org.apache.spark.util.{CompletionIterator, Utils}
private[spark]
final class ShuffleBlockFetcherIterator(
context: TaskContext,
- blockTransferService: BlockTransferService,
+ shuffleClient: ShuffleClient,
blockManager: BlockManager,
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
@@ -140,7 +141,8 @@ final class ShuffleBlockFetcherIterator(
val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
val blockIds = req.blocks.map(_._1.toString)
- blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
+ val address = req.address
+ shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
new BlockFetchingListener {
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
// Only add the buffer to results queue if the iterator is not zombie,
@@ -179,7 +181,7 @@ final class ShuffleBlockFetcherIterator(
var totalBlocks = 0
for ((address, blockInfos) <- blocksByAddress) {
totalBlocks += blockInfos.size
- if (address == blockManager.blockManagerId) {
+ if (address.executorId == blockManager.blockManagerId.executorId) {
// Filter out zero-sized blocks
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
numBlocksToFetch += localBlocks.size
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 063895d3c5..68d378f3a2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1237,6 +1237,8 @@ private[spark] object Utils extends Logging {
}
// Handles idiosyncracies with hash (add more as required)
+ // This method should be kept in sync with
+ // org.apache.spark.network.util.JavaUtils#nonNegativeHash().
def nonNegativeHash(obj: AnyRef): Int = {
// Required ?
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 81b64c36dd..429199f207 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -202,7 +202,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = SparkEnv.get.blockManager
val blockTransfer = SparkEnv.get.blockTransferService
blockManager.master.getLocations(blockId).foreach { cmId =>
- val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, blockId.toString)
+ val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
+ blockId.toString)
val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer())
.asInstanceOf[Iterator[Int]].toList
assert(deserialized === (1 to 100).toList)
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
new file mode 100644
index 0000000000..792b9cd8b6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient}
+
+/**
+ * This suite creates an external shuffle server and routes all shuffle fetches through it.
+ * Note that failures in this suite may arise due to changes in Spark that invalidate expectations
+ * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
+ * we hash files into folders.
+ */
+class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
+ var server: TransportServer = _
+ var rpcHandler: ExternalShuffleBlockHandler = _
+
+ override def beforeAll() {
+ val transportConf = SparkTransportConf.fromSparkConf(conf)
+ rpcHandler = new ExternalShuffleBlockHandler()
+ val transportContext = new TransportContext(transportConf, rpcHandler)
+ server = transportContext.createServer()
+
+ conf.set("spark.shuffle.manager", "sort")
+ conf.set("spark.shuffle.service.enabled", "true")
+ conf.set("spark.shuffle.service.port", server.getPort.toString)
+ }
+
+ override def afterAll() {
+ server.close()
+ }
+
+ // This test ensures that the external shuffle service is actually in use for the other tests.
+ test("using external shuffle service") {
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+ sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
+
+ val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
+
+ rdd.count()
+ rdd.count()
+
+ // Invalidate the registered executors, disallowing access to their shuffle blocks.
+ rpcHandler.clearRegisteredExecutors()
+
+ // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
+ // being set.
+ val e = intercept[SparkException] {
+ rdd.count()
+ }
+ e.getMessage should include ("Fetch failure will not retry stage due to testing config")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
index 2acc02a54f..19180e88eb 100644
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -24,10 +24,6 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
override def beforeAll() {
- System.setProperty("spark.shuffle.manager", "hash")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.manager")
+ conf.set("spark.shuffle.manager", "hash")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 840d8273cb..d78c99c2e1 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,10 +24,6 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
override def beforeAll() {
- System.setProperty("spark.shuffle.blockTransferService", "netty")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.blockTransferService")
+ conf.set("spark.shuffle.blockTransferService", "netty")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 2bdd84ce69..cda942e15a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -30,10 +30,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
val conf = new SparkConf(loadDefaults = false)
+ // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
+ // test that the shuffle works (rather than retrying until all blocks are local to one Executor).
+ conf.set("spark.test.noStageRetry", "true")
+
test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
@@ -47,7 +51,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
}
test("shuffle non-zero block size") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val NUM_BLOCKS = 3
val a = sc.parallelize(1 to 10, 2)
@@ -73,7 +77,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("shuffle serializer") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(x, new NonJavaSerializableClass(x * 2))
@@ -89,7 +93,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("zero sized blocks") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
// 10 partitions from 4 keys
val NUM_BLOCKS = 10
@@ -116,7 +120,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("zero sized blocks without kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
// 10 partitions from 4 keys
val NUM_BLOCKS = 10
@@ -141,7 +145,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -154,7 +158,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sorting on mutable pairs") {
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -168,7 +172,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("cogroup using mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
@@ -195,7 +199,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("subtract mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
@@ -209,11 +213,8 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .setAppName("test")
- .setMaster("local-cluster[2,1,512]")
- sc = new SparkContext(conf)
+ val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
@@ -226,10 +227,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sort with Java non serializable class - Java") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- val conf = new SparkConf()
- .setAppName("test")
- .setMaster("local-cluster[2,1,512]")
- sc = new SparkContext(conf)
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index 639e56c488..63358172ea 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -24,10 +24,6 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.
override def beforeAll() {
- System.setProperty("spark.shuffle.manager", "sort")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.manager")
+ conf.set("spark.shuffle.manager", "sort")
}
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 3925f0ccbd..bbdc9568a6 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -121,7 +121,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
}
val appId = "testId"
- val executorId = "executor.1"
+ val executorId = "1"
conf.set("spark.app.id", appId)
conf.set("spark.executor.id", executorId)
@@ -138,7 +138,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
override val metricRegistry = new MetricRegistry()
}
- val executorId = "executor.1"
+ val executorId = "1"
conf.set("spark.executor.id", executorId)
val instanceName = "executor"
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 4e502cf65e..28f766570e 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -21,22 +21,19 @@ import java.util.concurrent.Semaphore
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
-import org.apache.spark.{TaskContextImpl, TaskContext}
-import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
-import org.mockito.Mockito._
import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-
import org.scalatest.FunSuite
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{SparkConf, TaskContextImpl}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.serializer.TestSerializer
-
class ShuffleBlockFetcherIteratorSuite extends FunSuite {
// Some of the tests are quite tricky because we are testing the cleanup behavior
// in the presence of faults.
@@ -44,10 +41,10 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
/** Creates a mock [[BlockTransferService]] that returns data from the given map. */
private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): BlockTransferService = {
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val blocks = invocation.getArguments()(2).asInstanceOf[Seq[String]]
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
for (blockId <- blocks) {
if (data.contains(BlockId(blockId))) {
@@ -118,7 +115,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
// 3 local blocks, and 2 remote blocks
// (but from the same block manager so one call to fetchBlocks)
verify(blockManager, times(3)).getBlockData(any())
- verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any())
+ verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any())
}
test("release current unexhausted buffer in case the task completes early") {
@@ -138,9 +135,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val sem = new Semaphore(0)
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
listener.onBlockFetchSuccess(
@@ -201,9 +198,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val sem = new Semaphore(0)
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
// Return the first block, and then fail.
listener.onBlockFetchSuccess(