aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala37
3 files changed, 80 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
new file mode 100644
index 0000000000..f6e46ae9a4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkException
+
+private[spark]
+case class BlockFetchException(messages: String, throwable: Throwable)
+ extends SparkException(messages, throwable)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index fefaef0ab8..d31aa68eb6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,6 +23,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
+import scala.util.control.NonFatal
import scala.util.Random
import sun.nio.ch.DirectBuffer
@@ -600,10 +601,26 @@ private[spark] class BlockManager(
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
+ var numFetchFailures = 0
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
- val data = blockTransferService.fetchBlockSync(
- loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
+ val data = try {
+ blockTransferService.fetchBlockSync(
+ loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
+ } catch {
+ case NonFatal(e) =>
+ numFetchFailures += 1
+ if (numFetchFailures == locations.size) {
+ // An exception is thrown while fetching this block from all locations
+ throw new BlockFetchException(s"Failed to fetch block from" +
+ s" ${locations.size} locations. Most recent failure cause:", e)
+ } else {
+ // This location failed, so we retry fetch from a different one by returning null here
+ logWarning(s"Failed to fetch remote block $blockId " +
+ s"from $loc (failed attempt $numFetchFailures)", e)
+ null
+ }
+ }
if (data != null) {
if (asBlockResult) {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f480fd107a..e5b54d66c8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -47,6 +47,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
+ var store3: BlockManager = null
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
conf.set("spark.authenticate", "false")
@@ -99,6 +100,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store2.stop()
store2 = null
}
+ if (store3 != null) {
+ store3.stop()
+ store3 = null
+ }
rpcEnv.shutdown()
rpcEnv.awaitTermination()
rpcEnv = null
@@ -443,6 +448,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}
+ test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
+ val origTimeoutOpt = conf.getOption("spark.network.timeout")
+ try {
+ conf.set("spark.network.timeout", "2s")
+ store = makeBlockManager(8000, "executor1")
+ store2 = makeBlockManager(8000, "executor2")
+ store3 = makeBlockManager(8000, "executor3")
+ val list1 = List(new Array[Byte](4000))
+ store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ var list1Get = store.getRemoteBytes("list1")
+ assert(list1Get.isDefined, "list1Get expected to be fetched")
+ // block manager exit
+ store2.stop()
+ store2 = null
+ list1Get = store.getRemoteBytes("list1")
+ // get `list1` block
+ assert(list1Get.isDefined, "list1Get expected to be fetched")
+ store3.stop()
+ store3 = null
+ // exception throw because there is no locations
+ intercept[BlockFetchException] {
+ list1Get = store.getRemoteBytes("list1")
+ }
+ } finally {
+ origTimeoutOpt match {
+ case Some(t) => conf.set("spark.network.timeout", t)
+ case None => conf.remove("spark.network.timeout")
+ }
+ }
+ }
+
test("in-memory LRU storage") {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)