aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-04-10 16:05:14 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-10 16:14:06 -0700
commit767f582cf5a5e449fbbb7eae08456d62271b2839 (patch)
treee165158d662b352b2d079b654ae9b65bc52539b5
parent48321b83dbe3fb1f0c3b7d7c4b47c3b1ffa06d68 (diff)
downloadspark-767f582cf5a5e449fbbb7eae08456d62271b2839.tar.gz
spark-767f582cf5a5e449fbbb7eae08456d62271b2839.tar.bz2
spark-767f582cf5a5e449fbbb7eae08456d62271b2839.zip
[SPARK-6851][SQL] Create new instance for each converted parquet relation
Otherwise we end up rewriting predicates to be trivially equal (i.e. `a#1 = a#2` -> `a#3 = a#3`), at which point the query is no longer valid. Author: Michael Armbrust <michael@databricks.com> Closes #5458 from marmbrus/selfJoinParquet and squashes the following commits: 22df77c [Michael Armbrust] [SPARK-6851][SQL] Create new instance for each converted parquet relation (cherry picked from commit 23d5f8864f7d665a74b1d38118700139854dbb1c) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala77
2 files changed, 80 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c4da34ae64..a906c921df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -279,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
- if (metastoreRelation.hiveQlTable.isPartitioned) {
+ val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
@@ -314,6 +314,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
parquetRelation
}
+
+ result.newInstance()
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index efb57f7dcb..e177f290d5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -34,6 +34,17 @@ case class Nested3(f3: Int)
case class NestedArray2(b: Seq[Int])
case class NestedArray1(a: NestedArray2)
+case class Order(
+ id: Int,
+ make: String,
+ `type`: String,
+ price: Int,
+ pdate: String,
+ customer: String,
+ city: String,
+ state: String,
+ month: Int)
+
/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
@@ -41,6 +52,72 @@ case class NestedArray1(a: NestedArray2)
*/
class SQLQuerySuite extends QueryTest {
+ test("SPARK-6851: Self-joined converted parquet tables") {
+ val orders = Seq(
+ Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
+ Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
+ Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
+ Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
+ Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
+ Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
+ Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
+ Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
+ Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
+ Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
+ Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))
+
+ val orderUpdates = Seq(
+ Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
+ Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))
+
+ orders.toDF.registerTempTable("orders1")
+ orderUpdates.toDF.registerTempTable("orderupdates1")
+
+ sql(
+ """CREATE TABLE orders(
+ | id INT,
+ | make String,
+ | type String,
+ | price INT,
+ | pdate String,
+ | customer String,
+ | city String)
+ |PARTITIONED BY (state STRING, month INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ sql(
+ """CREATE TABLE orderupdates(
+ | id INT,
+ | make String,
+ | type String,
+ | price INT,
+ | pdate String,
+ | customer String,
+ | city String)
+ |PARTITIONED BY (state STRING, month INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
+ sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
+
+ checkAnswer(
+ sql(
+ """
+ |select orders.state, orders.month
+ |from orders
+ |join (
+ | select distinct orders.state,orders.month
+ | from orders
+ | join orderupdates
+ | on orderupdates.id = orders.id) ao
+ | on ao.state = orders.state and ao.month = orders.month
+ """.stripMargin),
+ (1 to 6).map(_ => Row("CA", 20151)))
+ }
+
test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")