aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-08-25 09:45:49 +0100
committerSean Owen <sowen@cloudera.com>2016-08-25 09:45:49 +0100
commit2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b (patch)
tree616d1cbf4bcd8fab750ab0a39f9d5c0187009ac4
parent5f02d2e5b4d37f554629cbd0e488e856fffd7b6b (diff)
downloadspark-2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b.tar.gz
spark-2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b.tar.bz2
spark-2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b.zip
[SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null
## What changes were proposed in this pull request? Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #14760 from srowen/SPARK-17193.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala2
2 files changed, 13 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fd3a14bd48..4640b5dc2f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.immutable.Map
-import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -317,7 +316,7 @@ class HadoopRDD[K, V](
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
+ HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
@@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging {
None
}
- private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
- val out = ListBuffer[String]()
- infos.foreach { loc =>
- val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
- getLocation.invoke(loc).asInstanceOf[String]
+ private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+ Option(infos).map(_.flatMap { loc =>
+ val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
+ val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
- if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
- invoke(loc).asInstanceOf[Boolean]) {
- logDebug("Partition " + locationStr + " is cached by Hadoop.")
- out += new HDFSCacheTaskLocation(locationStr).toString
+ if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+ logDebug(s"Partition $locationStr is cached by Hadoop.")
+ Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
- out += new HostTaskLocation(locationStr).toString
+ Some(HostTaskLocation(locationStr).toString)
}
+ } else {
+ None
}
- }
- out.seq
+ })
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index be919e6587..1c7aec919b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -255,7 +255,7 @@ class NewHadoopRDD[K, V](
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
+ HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)