aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-26 22:39:49 +0800
committerCheng Lian <lian@databricks.com>2015-02-26 22:39:49 +0800
commitf02394d06473889d0d7897c4583239e6e136ff46 (patch)
tree9176546713d273d05103c27b8b8c3c612023d6cc /sql
parent51a6f9097bb475cb518ca766a46c7226640cf58e (diff)
downloadspark-f02394d06473889d0d7897c4583239e6e136ff46.tar.gz
spark-f02394d06473889d0d7897c4583239e6e136ff46.tar.bz2
spark-f02394d06473889d0d7897c4583239e6e136ff46.zip
[SPARK-6023][SQL] ParquetConversions fails to replace the destination MetastoreRelation of an InsertIntoTable node to ParquetRelation2
JIRA: https://issues.apache.org/jira/browse/SPARK-6023 Author: Yin Huai <yhuai@databricks.com> Closes #4782 from yhuai/parquetInsertInto and squashes the following commits: ae7e806 [Yin Huai] Convert MetastoreRelation in InsertIntoTable and InsertIntoHiveTable. ba543cd [Yin Huai] More tests. 50b6d0f [Yin Huai] Update error messages. 346780c [Yin Huai] Failed test.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala138
2 files changed, 152 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2cc8d65d3c..8af5a4848f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
+ // Write path
+ case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if !relation.hiveQlTable.isPartitioned &&
+ hive.convertMetastoreParquet &&
+ hive.conf.parquetUseDataSourceApi &&
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = relation.output.zip(parquetRelation.output)
+ (relation, parquetRelation, attributedRewrites)
+
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
@@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
withAlias
}
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+ if relationMap.contains(r) => {
+ val parquetRelation = relationMap(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite)
+ }
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
+ if relationMap.contains(r) => {
+ val parquetRelation = relationMap(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite)
+ }
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 653f4b4736..80fd5cda20 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.PhysicalRDD
-import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -93,6 +93,11 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}
+ val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd1).registerTempTable("jt")
+ val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
+ jsonRDD(rdd2).registerTempTable("jt_array")
+
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
@@ -100,6 +105,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql("DROP TABLE partitioned_parquet")
sql("DROP TABLE partitioned_parquet_with_key")
sql("DROP TABLE normal_parquet")
+ sql("DROP TABLE IF EXISTS jt")
+ sql("DROP TABLE IF EXISTS jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}
@@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()
- val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- jsonRDD(rdd).registerTempTable("jt")
-
sql(
"""
|create table test_parquet
@@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
override def afterAll(): Unit = {
super.afterAll()
- sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS test_parquet")
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
@@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE IF EXISTS test_parquet_ctas")
}
+
+ test("MetastoreRelation in InsertIntoTable will be converted") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+ df.queryExecution.executedPlan match {
+ case ExecutedCommand(
+ InsertIntoDataSource(
+ LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case o => fail("test_insert_parquet should be converted to a " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+ s"However, found a ${o.toString} ")
+ }
+
+ checkAnswer(
+ sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
+ sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+ )
+
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
+
+ test("MetastoreRelation in InsertIntoHiveTable will be converted") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | int_array array<int>
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
+ df.queryExecution.executedPlan match {
+ case ExecutedCommand(
+ InsertIntoDataSource(
+ LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case o => fail("test_insert_parquet should be converted to a " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+ s"However, found a ${o.toString} ")
+ }
+
+ checkAnswer(
+ sql("SELECT int_array FROM test_insert_parquet"),
+ sql("SELECT a FROM jt_array").collect()
+ )
+
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
@@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
super.afterAll()
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("MetastoreRelation in InsertIntoTable will not be converted") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+ df.queryExecution.executedPlan match {
+ case insert: InsertIntoHiveTable => // OK
+ case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+ s"However, found ${o.toString}.")
+ }
+
+ checkAnswer(
+ sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
+ sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+ )
+
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
+
+ // TODO: enable it after the fix of SPARK-5950.
+ ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | int_array array<int>
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
+ df.queryExecution.executedPlan match {
+ case insert: InsertIntoHiveTable => // OK
+ case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+ s"However, found ${o.toString}.")
+ }
+
+ checkAnswer(
+ sql("SELECT int_array FROM test_insert_parquet"),
+ sql("SELECT a FROM jt_array").collect()
+ )
+
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
}
/**