aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorxin Wu <xinwu@us.ibm.com>2015-11-16 08:10:48 -0800
committerYin Huai <yhuai@databricks.com>2015-11-16 08:10:48 -0800
commit0e79604aed116bdcb40e03553a2d103b5b1cdbae (patch)
tree8f020affa639f3e82e97bb645b068cc04f4d2794 /core
parente388b39d10fc269cdd3d630ea7d4ae80fd0efa97 (diff)
downloadspark-0e79604aed116bdcb40e03553a2d103b5b1cdbae.tar.gz
spark-0e79604aed116bdcb40e03553a2d103b5b1cdbae.tar.bz2
spark-0e79604aed116bdcb40e03553a2d103b5b1cdbae.zip
[SPARK-11522][SQL] input_file_name() returns "" for external tables
When computing partition for non-parquet relation, `HadoopRDD.compute` is used. but it does not set the thread local variable `inputFileName` in `NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the `inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty now. Adding the setting inputFileName in HadoopRDD.compute resolves this issue. Author: xin Wu <xinwu@us.ibm.com> Closes #9542 from xwu0226/SPARK-11522.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 0453614f6a..7db5834687 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,6 +213,12 @@ class HadoopRDD[K, V](
val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+ // Sets the thread local variable for the file's name
+ split.inputSplit.value match {
+ case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
+ case _ => SqlNewHadoopRDD.unsetInputFileName()
+ }
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
@@ -250,6 +256,7 @@ class HadoopRDD[K, V](
override def close() {
if (reader != null) {
+ SqlNewHadoopRDD.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic