aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-03-02 19:31:55 -0800
committerMichael Armbrust <michael@databricks.com>2015-03-02 19:31:55 -0800
commit12599942e69e4d73040f3a8611661a0862514ffc (patch)
tree2376d48c6c0644211633a632ade2fca9eb777c56 /sql/hive
parent9eb22ece115c69899d100cecb8a5e20b3a268649 (diff)
downloadspark-12599942e69e4d73040f3a8611661a0862514ffc.tar.gz
spark-12599942e69e4d73040f3a8611661a0862514ffc.tar.bz2
spark-12599942e69e4d73040f3a8611661a0862514ffc.zip
[SPARK-5950][SQL]Insert array into a metastore table saved as parquet should work when using datasource api
This PR contains the following changes: 1. Add a new method, `DataType.equalsIgnoreCompatibleNullability`, which is the middle ground between DataType's equality check and `DataType.equalsIgnoreNullability`. For two data types `from` and `to`, it does `equalsIgnoreNullability` as well as if the nullability of `from` is compatible with that of `to`. For example, the nullability of `ArrayType(IntegerType, containsNull = false)` is compatible with that of `ArrayType(IntegerType, containsNull = true)` (for an array without null values, we can always say it may contain null values). However, the nullability of `ArrayType(IntegerType, containsNull = true)` is incompatible with that of `ArrayType(IntegerType, containsNull = false)` (for an array that may have null values, we cannot say it does not have null values). 2. For the `resolved` field of `InsertIntoTable`, use `equalsIgnoreCompatibleNullability` to replace the equality check of the data types. 3. For our data source write path, when appending data, we always use the schema of existing table to write the data. This is important for parquet, since nullability direct impacts the way to encode/decode values. If we do not do this, we may see corrupted values when reading values from a set of parquet files generated with different nullability settings. 4. When generating a new parquet table, we always set nullable/containsNull/valueContainsNull to true. So, we will not face situations that we cannot append data because containsNull/valueContainsNull in an Array/Map column of the existing table has already been set to `false`. This change makes the whole data pipeline more robust. 5. Update the equality check of JSON relation. Since JSON does not really cares nullability, `equalsIgnoreNullability` seems a better choice to compare schemata from to JSON tables. JIRA: https://issues.apache.org/jira/browse/SPARK-5950 Thanks viirya for the initial work in #4729. cc marmbrus liancheng Author: Yin Huai <yhuai@databricks.com> Closes #4826 from yhuai/insertNullabilityCheck and squashes the following commits: 3b61a04 [Yin Huai] Revert change on equals. 80e487e [Yin Huai] asNullable in UDT. 587d88b [Yin Huai] Make methods private. 0cb7ea2 [Yin Huai] marmbrus's comments. 3cec464 [Yin Huai] Cheng's comments. 486ed08 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck d3747d1 [Yin Huai] Remove unnecessary change. 8360817 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck 8a3f237 [Yin Huai] Use equalsIgnoreNullability instead of equality check. 0eb5578 [Yin Huai] Fix tests. f6ed813 [Yin Huai] Update old parquet path. e4f397c [Yin Huai] Unit tests. b2c06f8 [Yin Huai] Ignore nullability in JSON relation's equality check. 8bd008b [Yin Huai] nullable, containsNull, and valueContainsNull will be always true for parquet data. bf50d73 [Yin Huai] When appending data, we use the schema of the existing table instead of the schema of the new data. 0a703e7 [Yin Huai] Test failed again since we cannot read correct content. 9a26611 [Yin Huai] Make InsertIntoTable happy. 8f19fe5 [Yin Huai] equalsIgnoreCompatibleNullability 4ec17fd [Yin Huai] Failed test.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala71
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala29
4 files changed, 115 insertions, 23 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 74b4e767ca..86fc6548f9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -638,7 +638,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
p
} else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
childOutputDataTypes.zip(tableOutputDataTypes)
- .forall { case (left, right) => DataType.equalsIgnoreNullability(left, right) }) {
+ .forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable.
InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite)
@@ -686,8 +686,7 @@ private[hive] case class InsertIntoHiveTable(
override def output = child.output
override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
- case (childAttr, tableAttr) =>
- DataType.equalsIgnoreNullability(childAttr.dataType, tableAttr.dataType)
+ case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index ffaef8eef1..36bd3f8fe2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -169,6 +169,7 @@ case class CreateMetastoreDataSourceAsSelect(
options
}
+ var existingSchema = None: Option[StructType]
if (sqlContext.catalog.tableExists(Seq(tableName))) {
// Check if we need to throw an exception or just return.
mode match {
@@ -188,22 +189,7 @@ case class CreateMetastoreDataSourceAsSelect(
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
case l @ LogicalRelation(i: InsertableRelation) =>
- if (l.schema != createdRelation.schema) {
- val errorDescription =
- s"Cannot append to table $tableName because the schema of this " +
- s"DataFrame does not match the schema of table $tableName."
- val errorMessage =
- s"""
- |$errorDescription
- |== Schemas ==
- |${sideBySide(
- s"== Expected Schema ==" +:
- l.schema.treeString.split("\\\n"),
- s"== Actual Schema ==" +:
- createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
- """.stripMargin
- throw new AnalysisException(errorMessage)
- } else if (i != createdRelation.relation) {
+ if (i != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
s"match the existing relation of $tableName. " +
@@ -221,6 +207,7 @@ case class CreateMetastoreDataSourceAsSelect(
""".stripMargin
throw new AnalysisException(errorMessage)
}
+ existingSchema = Some(l.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
@@ -234,15 +221,23 @@ case class CreateMetastoreDataSourceAsSelect(
createMetastoreTable = true
}
- val df = DataFrame(hiveContext, query)
+ val data = DataFrame(hiveContext, query)
+ val df = existingSchema match {
+ // If we are inserting into an existing table, just use the existing schema.
+ case Some(schema) => sqlContext.createDataFrame(data.queryExecution.toRdd, schema)
+ case None => data
+ }
// Create the relation based on the data of df.
- ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
+ val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
if (createMetastoreTable) {
+ // We will use the schema of resolved.relation as the schema of the table (instead of
+ // the schema of df). It is important since the nullability may be changed by the relation
+ // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
tableName,
- Some(df.schema),
+ Some(resolved.relation.schema),
provider,
optionsWithPath,
isExternal)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 868c35f35f..5d6a6f3b64 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -34,6 +34,8 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
+import scala.collection.mutable.ArrayBuffer
+
/**
* Tests for persisting tables created though the data sources API into the metastore.
*/
@@ -581,7 +583,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
- s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ "test_parquet_ctas should be converted to " +
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
// Clenup and reset confs.
@@ -592,6 +595,72 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}
}
+ test("Pre insert nullability check (ArrayType)") {
+ val df1 =
+ createDataFrame(Tuple1(Seq(Int.box(1), null.asInstanceOf[Integer])) :: Nil).toDF("a")
+ val expectedSchema1 =
+ StructType(
+ StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
+ assert(df1.schema === expectedSchema1)
+ df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite)
+
+ val df2 =
+ createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
+ val expectedSchema2 =
+ StructType(
+ StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
+ assert(df2.schema === expectedSchema2)
+ df2.insertInto("arrayInParquet", overwrite = false)
+ createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
+ .saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
+ .saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
+ refreshTable("arrayInParquet")
+
+ checkAnswer(
+ sql("SELECT a FROM arrayInParquet"),
+ Row(ArrayBuffer(1, null)) ::
+ Row(ArrayBuffer(2, 3)) ::
+ Row(ArrayBuffer(4, 5)) ::
+ Row(ArrayBuffer(6, null)) :: Nil)
+
+ sql("DROP TABLE arrayInParquet")
+ }
+
+ test("Pre insert nullability check (MapType)") {
+ val df1 =
+ createDataFrame(Tuple1(Map(1 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
+ val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = true)
+ val expectedSchema1 =
+ StructType(
+ StructField("a", mapType1, nullable = true) :: Nil)
+ assert(df1.schema === expectedSchema1)
+ df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
+
+ val df2 =
+ createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
+ val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = false)
+ val expectedSchema2 =
+ StructType(
+ StructField("a", mapType2, nullable = true) :: Nil)
+ assert(df2.schema === expectedSchema2)
+ df2.insertInto("mapInParquet", overwrite = false)
+ createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
+ .saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
+ .saveAsTable("mapInParquet", "parquet", SaveMode.Append)
+ refreshTable("mapInParquet")
+
+ checkAnswer(
+ sql("SELECT a FROM mapInParquet"),
+ Row(Map(1 -> null)) ::
+ Row(Map(2 -> 3)) ::
+ Row(Map(4 -> 5)) ::
+ Row(Map(6 -> null)) :: Nil)
+
+ sql("DROP TABLE mapInParquet")
+ }
+
test("SPARK-6024 wide schema support") {
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index c8da8eea4e..89b943f008 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types._
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -522,6 +523,34 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
super.afterAll()
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("values in arrays and maps stored in parquet are always nullable") {
+ val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
+ val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
+ val arrayType1 = ArrayType(IntegerType, containsNull = false)
+ val expectedSchema1 =
+ StructType(
+ StructField("m", mapType1, nullable = true) ::
+ StructField("a", arrayType1, nullable = true) :: Nil)
+ assert(df.schema === expectedSchema1)
+
+ df.saveAsTable("alwaysNullable", "parquet")
+
+ val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
+ val arrayType2 = ArrayType(IntegerType, containsNull = true)
+ val expectedSchema2 =
+ StructType(
+ StructField("m", mapType2, nullable = true) ::
+ StructField("a", arrayType2, nullable = true) :: Nil)
+
+ assert(table("alwaysNullable").schema === expectedSchema2)
+
+ checkAnswer(
+ sql("SELECT m, a FROM alwaysNullable"),
+ Row(Map(2 -> 3), Seq(4, 5, 6)))
+
+ sql("DROP TABLE alwaysNullable")
+ }
}
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {