aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-06-20 21:55:34 -0700
committerReynold Xin <rxin@databricks.com>2016-06-20 21:55:34 -0700
commit4f7f1c436205630ab77d3758d7210cc1a2f0d04a (patch)
tree5444ef9df4fee331c71d5a2ce78593d879448264 /core
parent18a8a9b1f4114211cd108efda5672f2bd2c6e5cd (diff)
downloadspark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.tar.gz
spark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.tar.bz2
spark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.zip
[SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD
## What changes were proposed in this pull request? This PR makes `input_file_name()` function return the file paths not empty strings for external data sources based on `NewHadoopRDD`, such as [spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149) and [spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47). The codes with the external data sources below: ```scala df.select(input_file_name).show() ``` will produce - **Before** ``` +-----------------+ |input_file_name()| +-----------------+ | | +-----------------+ ``` - **After** ``` +--------------------+ | input_file_name()| +--------------------+ |file:/private/var...| +--------------------+ ``` ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #13759 from HyukjinKwon/SPARK-16044.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala7
2 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
index 108e9d2558..f40d4c8e0a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
@@ -21,7 +21,7 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* This holds file names of the current Spark task. This is used in HadoopRDD,
- * FileScanRDD and InputFileName function in Spark SQL.
+ * FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL.
*/
private[spark] object InputFileNameHolder {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 189dc7b331..b086baa084 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -135,6 +135,12 @@ class NewHadoopRDD[K, V](
val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
+ // Sets the thread local variable for the file's name
+ split.serializableHadoopSplit.value match {
+ case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
+ case _ => InputFileNameHolder.unsetInputFileName()
+ }
+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
@@ -201,6 +207,7 @@ class NewHadoopRDD[K, V](
private def close() {
if (reader != null) {
+ InputFileNameHolder.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic