aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authoranimesh <animesh@apache.spark>2015-07-06 16:39:49 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-06 16:39:49 -0700
commit09a06418debc25da0191d98798f7c5016d39be91 (patch)
tree4b7ee269ad58c8879fe0c2c78c388b37a61af362 /sql
parent7b467cc9348fa910e445ad08914a72f8ed4fc249 (diff)
downloadspark-09a06418debc25da0191d98798f7c5016d39be91.tar.gz
spark-09a06418debc25da0191d98798f7c5016d39be91.tar.bz2
spark-09a06418debc25da0191d98798f7c5016d39be91.zip
[SPARK-8072] [SQL] Better AnalysisException for writing DataFrame with identically named columns
Adding a function checkConstraints which will check for the constraints to be applied on the dataframe / dataframe schema. Function called before storing the dataframe to an external storage. Function added in the corresponding datasource API. cc rxin marmbrus Author: animesh <animesh@apache.spark> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #7013 from animeshbaranawal/8072 and squashes the following commits: f70dd0e [animesh] Change IO exception to Analysis Exception fd45e1b [animesh] 8072: Fix Style Issues a8a964f [animesh] 8072: Improving on previous commits 3cc4d2c [animesh] Fix Style Issues 1a89115 [animesh] Fix Style Issues 98b4399 [animesh] 8072 : Moved the exception handling to ResolvedDataSource specific to parquet format 7c3d928 [animesh] 8072: Adding check to DataFrameWriter.scala
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala24
3 files changed, 73 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 69bf13e1e5..2361d3bf52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,6 +22,7 @@ import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -37,6 +38,17 @@ private[sql] class DefaultSource
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
}
+ /** Constraints to be imposed on dataframe to be stored. */
+ private def checkConstraints(data: DataFrame): Unit = {
+ if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+ val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to JSON format")
+ }
+ }
+
/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
@@ -63,6 +75,10 @@ private[sql] class DefaultSource
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
+ // check if dataframe satisfies the constraints
+ // before moving forward
+ checkConstraints(data)
+
val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -130,6 +146,17 @@ private[sql] class JSONRelation(
samplingRatio,
userSpecifiedSchema)(sqlContext)
+ /** Constraints to be imposed on dataframe to be stored. */
+ private def checkConstraints(data: DataFrame): Unit = {
+ if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+ val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to JSON format")
+ }
+ }
+
private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
override val needConversion: Boolean = false
@@ -178,6 +205,10 @@ private[sql] class JSONRelation(
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ // check if dataframe satisfies constraints
+ // before moving forward
+ checkConstraints(data)
+
val filesystemPath = path match {
case Some(p) => new Path(p)
case None =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 5ac3e9a44e..6bc69c6ad0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -164,7 +164,24 @@ private[sql] class ParquetRelation2(
}
}
- override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+ /** Constraints on schema of dataframe to be stored. */
+ private def checkConstraints(schema: StructType): Unit = {
+ if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+ val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to parquet format")
+ }
+ }
+
+ override def dataSchema: StructType = {
+ val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+ // check if schema satisfies the constraints
+ // before moving forward
+ checkConstraints(schema)
+ schema
+ }
override private[sql] def refresh(): Unit = {
super.refresh()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index afb1cf5f8d..f592a9934d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -737,4 +737,28 @@ class DataFrameSuite extends QueryTest {
df.col("")
df.col("t.``")
}
+
+ test("SPARK-8072: Better Exception for Duplicate Columns") {
+ // only one duplicate column present
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
+ val df1 = Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1")
+ .write.format("parquet").save("temp")
+ }
+ assert(e.getMessage.contains("Duplicate column(s)"))
+ assert(e.getMessage.contains("parquet"))
+ assert(e.getMessage.contains("column1"))
+ assert(!e.getMessage.contains("column2"))
+
+ // multiple duplicate columns present
+ val f = intercept[org.apache.spark.sql.AnalysisException] {
+ val df2 = Seq((1, 2, 3, 4, 5), (2, 3, 4, 5, 6), (3, 4, 5, 6, 7))
+ .toDF("column1", "column2", "column3", "column1", "column3")
+ .write.format("json").save("temp")
+ }
+ assert(f.getMessage.contains("Duplicate column(s)"))
+ assert(f.getMessage.contains("JSON"))
+ assert(f.getMessage.contains("column1"))
+ assert(f.getMessage.contains("column3"))
+ assert(!f.getMessage.contains("column2"))
+ }
}