diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-06-20 21:55:34 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-20 21:55:34 -0700 |
commit | 4f7f1c436205630ab77d3758d7210cc1a2f0d04a (patch) | |
tree | 5444ef9df4fee331c71d5a2ce78593d879448264 /sql | |
parent | 18a8a9b1f4114211cd108efda5672f2bd2c6e5cd (diff) | |
download | spark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.tar.gz spark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.tar.bz2 spark-4f7f1c436205630ab77d3758d7210cc1a2f0d04a.zip |
[SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD
## What changes were proposed in this pull request?
This PR makes `input_file_name()` function return the file paths not empty strings for external data sources based on `NewHadoopRDD`, such as [spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149) and [spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47).
The codes with the external data sources below:
```scala
df.select(input_file_name).show()
```
will produce
- **Before**
```
+-----------------+
|input_file_name()|
+-----------------+
| |
+-----------------+
```
- **After**
```
+--------------------+
| input_file_name()|
+--------------------+
|file:/private/var...|
+--------------------+
```
## How was this patch tested?
Unit tests in `ColumnExpressionSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #13759 from HyukjinKwon/SPARK-16044.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index e89fa32b15..a66c83dea0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.scalatest.Matchers._ import org.apache.spark.sql.catalyst.expressions.NamedExpression @@ -592,7 +594,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("input_file_name") { + test("input_file_name - FileScanRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) @@ -604,6 +606,35 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { } } + test("input_file_name - HadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val df = spark.sparkContext.textFile(dir.getCanonicalPath).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + + test("input_file_name - NewHadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val rdd = spark.sparkContext.newAPIHadoopFile( + dir.getCanonicalPath, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]) + val df = rdd.map(pair => pair._2.toString).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + test("columns can be compared") { assert('key.desc == 'key.desc) assert('key.desc != 'key.asc) @@ -707,5 +738,4 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { testData2.select($"a".bitwiseXOR($"b").bitwiseXOR(39)), testData2.collect().toSeq.map(r => Row(r.getInt(0) ^ r.getInt(1) ^ 39))) } - } |