aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiu Guo <xguo27@gmail.com>2016-01-03 20:48:56 -0800
committerReynold Xin <rxin@databricks.com>2016-01-03 20:48:56 -0800
commit84f8492c1555bf8ab44c9818752278f61768eb16 (patch)
tree698803d2233e8c50282c61af16f348c2e51f1f4f
parent13dab9c3862cc454094cd9ba7b4504a2d095028f (diff)
downloadspark-84f8492c1555bf8ab44c9818752278f61768eb16.tar.gz
spark-84f8492c1555bf8ab44c9818752278f61768eb16.tar.bz2
spark-84f8492c1555bf8ab44c9818752278f61768eb16.zip
[SPARK-12562][SQL] DataFrame.write.format(text) requires the column name to be called value
Author: Xiu Guo <xguo27@gmail.com> Closes #10515 from xguo27/SPARK-12562.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala4
2 files changed, 7 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 248467abe9..fe69c72d28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -48,7 +48,7 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
dataSchema.foreach(verifySchema)
- new TextRelation(None, partitionColumns, paths)(sqlContext)
+ new TextRelation(None, dataSchema, partitionColumns, paths)(sqlContext)
}
override def shortName(): String = "text"
@@ -68,15 +68,16 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class TextRelation(
val maybePartitionSpec: Option[PartitionSpec],
+ val textSchema: Option[StructType],
override val userDefinedPartitionColumns: Option[StructType],
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters) {
- /** Data schema is always a single column, named "value". */
- override def dataSchema: StructType = new StructType().add("value", StringType)
-
+ /** Data schema is always a single column, named "value" if original Data source has no schema. */
+ override def dataSchema: StructType =
+ textSchema.getOrElse(new StructType().add("value", StringType))
/** This is an internal data source that outputs internal row format. */
override val needConversion: Boolean = false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 914e516613..02c416af50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -33,8 +33,8 @@ class TextSuite extends QueryTest with SharedSQLContext {
verifyFrame(sqlContext.read.text(testFile))
}
- test("writing") {
- val df = sqlContext.read.text(testFile)
+ test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
+ val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf")
val tempFile = Utils.createTempDir()
tempFile.delete()