aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-03-02 19:31:55 -0800
committerMichael Armbrust <michael@databricks.com>2015-03-02 19:31:55 -0800
commit12599942e69e4d73040f3a8611661a0862514ffc (patch)
tree2376d48c6c0644211633a632ade2fca9eb777c56 /sql/core
parent9eb22ece115c69899d100cecb8a5e20b3a268649 (diff)
downloadspark-12599942e69e4d73040f3a8611661a0862514ffc.tar.gz
spark-12599942e69e4d73040f3a8611661a0862514ffc.tar.bz2
spark-12599942e69e4d73040f3a8611661a0862514ffc.zip
[SPARK-5950][SQL]Insert array into a metastore table saved as parquet should work when using datasource api
This PR contains the following changes: 1. Add a new method, `DataType.equalsIgnoreCompatibleNullability`, which is the middle ground between DataType's equality check and `DataType.equalsIgnoreNullability`. For two data types `from` and `to`, it does `equalsIgnoreNullability` as well as if the nullability of `from` is compatible with that of `to`. For example, the nullability of `ArrayType(IntegerType, containsNull = false)` is compatible with that of `ArrayType(IntegerType, containsNull = true)` (for an array without null values, we can always say it may contain null values). However, the nullability of `ArrayType(IntegerType, containsNull = true)` is incompatible with that of `ArrayType(IntegerType, containsNull = false)` (for an array that may have null values, we cannot say it does not have null values). 2. For the `resolved` field of `InsertIntoTable`, use `equalsIgnoreCompatibleNullability` to replace the equality check of the data types. 3. For our data source write path, when appending data, we always use the schema of existing table to write the data. This is important for parquet, since nullability direct impacts the way to encode/decode values. If we do not do this, we may see corrupted values when reading values from a set of parquet files generated with different nullability settings. 4. When generating a new parquet table, we always set nullable/containsNull/valueContainsNull to true. So, we will not face situations that we cannot append data because containsNull/valueContainsNull in an Array/Map column of the existing table has already been set to `false`. This change makes the whole data pipeline more robust. 5. Update the equality check of JSON relation. Since JSON does not really cares nullability, `equalsIgnoreNullability` seems a better choice to compare schemata from to JSON tables. JIRA: https://issues.apache.org/jira/browse/SPARK-5950 Thanks viirya for the initial work in #4729. cc marmbrus liancheng Author: Yin Huai <yhuai@databricks.com> Closes #4826 from yhuai/insertNullabilityCheck and squashes the following commits: 3b61a04 [Yin Huai] Revert change on equals. 80e487e [Yin Huai] asNullable in UDT. 587d88b [Yin Huai] Make methods private. 0cb7ea2 [Yin Huai] marmbrus's comments. 3cec464 [Yin Huai] Cheng's comments. 486ed08 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck d3747d1 [Yin Huai] Remove unnecessary change. 8360817 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck 8a3f237 [Yin Huai] Use equalsIgnoreNullability instead of equality check. 0eb5578 [Yin Huai] Fix tests. f6ed813 [Yin Huai] Update old parquet path. e4f397c [Yin Huai] Unit tests. b2c06f8 [Yin Huai] Ignore nullability in JSON relation's equality check. 8bd008b [Yin Huai] nullable, containsNull, and valueContainsNull will be always true for parquet data. bf50d73 [Yin Huai] When appending data, we use the schema of the existing table instead of the schema of the new data. 0a703e7 [Yin Huai] Test failed again since we cannot read correct content. 9a26611 [Yin Huai] Make InsertIntoTable happy. 8f19fe5 [Yin Huai] equalsIgnoreCompatibleNullability 4ec17fd [Yin Huai] Failed test.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala2
8 files changed, 31 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 3b68b7c275..f9d0ba2241 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
private[sql] class DefaultSource
@@ -131,7 +131,7 @@ private[sql] case class JSONRelation(
override def equals(other: Any): Boolean = other match {
case that: JSONRelation =>
- (this.path == that.path) && (this.schema == that.schema)
+ (this.path == that.path) && this.schema.sameType(that.schema)
case _ => false
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index a0d1005c0c..fd161bae12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -23,6 +23,7 @@ import java.util.logging.Level
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
+import org.apache.spark.sql.types.{StructType, DataType}
import parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
@@ -172,9 +173,13 @@ private[sql] object ParquetRelation {
sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
.name())
ParquetRelation.enableLogForwarding()
- ParquetTypesConverter.writeMetaData(attributes, path, conf)
+ // This is a hack. We always set nullable/containsNull/valueContainsNull to true
+ // for the schema of a parquet data.
+ val schema = StructType.fromAttributes(attributes).asNullable
+ val newAttributes = schema.toAttributes
+ ParquetTypesConverter.writeMetaData(newAttributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
- override val output = attributes
+ override val output = newAttributes
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 225ec6db7d..62813a981e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -278,7 +278,10 @@ private[sql] case class InsertIntoParquetTable(
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
val conf = ContextUtil.getConfiguration(job)
- RowWriteSupport.setSchema(relation.output, conf)
+ // This is a hack. We always set nullable/containsNull/valueContainsNull to true
+ // for the schema of a parquet data.
+ val schema = StructType.fromAttributes(relation.output).asNullable
+ RowWriteSupport.setSchema(schema.toAttributes, conf)
val fspath = new Path(relation.path)
val fs = fspath.getFileSystem(conf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 6d56be3ab8..8d95858493 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -115,9 +115,15 @@ private[sql] class DefaultSource
}
val relation = if (doInsertion) {
+ // This is a hack. We always set nullable/containsNull/valueContainsNull to true
+ // for the schema of a parquet data.
+ val df =
+ sqlContext.createDataFrame(
+ data.queryExecution.toRdd,
+ data.schema.asNullable)
val createdRelation =
- createRelation(sqlContext, parameters, data.schema).asInstanceOf[ParquetRelation2]
- createdRelation.insert(data, overwrite = mode == SaveMode.Overwrite)
+ createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
+ createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
createdRelation
} else {
// If the save mode is Ignore, we will just create the relation based on existing data.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c9cd0e6e93..0e540dad81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{LogicalRDD, RunnableCommand}
+import org.apache.spark.sql.execution.RunnableCommand
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -29,7 +29,10 @@ private[sql] case class InsertIntoDataSource(
override def run(sqlContext: SQLContext) = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
- relation.insert(DataFrame(sqlContext, query), overwrite)
+ val data = DataFrame(sqlContext, query)
+ // Apply the schema of the existing table to the new data.
+ val df = sqlContext.createDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+ relation.insert(df, overwrite)
// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(logicalRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 8440581074..cfa58f1442 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -56,7 +56,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
child: LogicalPlan) = {
val newChildOutput = expectedOutput.zip(child.output).map {
case (expected, actual) =>
- val needCast = !DataType.equalsIgnoreNullability(expected.dataType, actual.dataType)
+ val needCast = !expected.dataType.sameType(actual.dataType)
// We want to make sure the filed names in the data to be inserted exactly match
// names in the schema.
val needRename = expected.name != actual.name
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
index eb045e37bf..c11d0ae5bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
@@ -59,4 +59,6 @@ private[sql] class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
}
override def userClass: Class[ExamplePoint] = classOf[ExamplePoint]
+
+ private[spark] override def asNullable: ExamplePointUDT = this
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 47fdb55432..23f424c0bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -62,6 +62,8 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
}
override def userClass = classOf[MyDenseVector]
+
+ private[spark] override def asNullable: MyDenseVectorUDT = this
}
class UserDefinedTypeSuite extends QueryTest {