diff options
author | xutingjun <xutingjun@huawei.com> | 2015-09-22 11:01:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-22 11:01:32 -0700 |
commit | 2ea0f2e11b82ef4817c7e6a162ea23da7860b893 (patch) | |
tree | efa97a1abc213f23ab51795104457b84fede5493 /core/src | |
parent | 7104ee0e5dc1290b8b845a0a8ddcdb1875cfd060 (diff) | |
download | spark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.tar.gz spark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.tar.bz2 spark-2ea0f2e11b82ef4817c7e6a162ea23da7860b893.zip |
[SPARK-9585] Delete the input format caching because some input format are non thread safe
If we cache the InputFormat, all tasks on the same executor will share it.
Some InputFormat is thread safety, but some are not, such as HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, unexpected error may be occurs.
To avoid it, I think we should delete the input format caching.
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>
Author: Xutingjun <xutingjun@huawei.com>
Closes #7918 from XuTingjun/cached_inputFormat.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 |
1 files changed, 0 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8f2655d63b..77b57132b9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -182,17 +182,11 @@ class HadoopRDD[K, V]( } protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { - if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { - return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] - } - // Once an InputFormat for this RDD is created, cache it so that only one reflection call is - // done in each local process. val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] if (newInputFormat.isInstanceOf[Configurable]) { newInputFormat.asInstanceOf[Configurable].setConf(conf) } - HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) newInputFormat } |