aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorxutingjun <xutingjun@huawei.com>2015-09-22 11:01:32 -0700
committerReynold Xin <rxin@databricks.com>2015-09-22 11:01:32 -0700
commit2ea0f2e11b82ef4817c7e6a162ea23da7860b893 (patch)
treeefa97a1abc213f23ab51795104457b84fede5493 /core
parent7104ee0e5dc1290b8b845a0a8ddcdb1875cfd060 (diff)
downloadspark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.tar.gz
spark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.tar.bz2
spark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.zip
[SPARK-9585] Delete the input format caching because some input format are non thread safe
If we cache the InputFormat, all tasks on the same executor will share it. Some InputFormat is thread safety, but some are not, such as HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, unexpected error may be occurs. To avoid it, I think we should delete the input format caching. Author: xutingjun <xutingjun@huawei.com> Author: meiyoula <1039320815@qq.com> Author: Xutingjun <xutingjun@huawei.com> Closes #7918 from XuTingjun/cached_inputFormat.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala6
1 files changed, 0 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 8f2655d63b..77b57132b9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -182,17 +182,11 @@ class HadoopRDD[K, V](
}
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
- if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
- return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
- }
- // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
- // done in each local process.
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
if (newInputFormat.isInstanceOf[Configurable]) {
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
- HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
newInputFormat
}