From 2b7e63585d61be2dab78b70af3867cda3983d5b1 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 23 May 2015 09:48:20 -0700 Subject: [SPARK-7654] [SQL] Move insertInto into reader/writer interface. This one continues the work of https://github.com/apache/spark/pull/6216. Author: Yin Huai Author: Reynold Xin Closes #6366 from yhuai/insert and squashes the following commits: 3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable. 56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer. c636e35 [Yin Huai] Remove unnecessary empty lines. cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation. 0841a54 [Reynold Xin] Removed experimental tag for deprecated methods. 33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface. --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 4 ++++ .../org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 6 +++--- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 8 ++++---- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 8 ++++---- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 4 ++-- .../org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 10 ---------- 6 files changed, 17 insertions(+), 23 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a8e8e70db0..0d807f428a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -373,6 +373,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ResolveHiveWindowFunction :: sources.PreInsertCastAndRename :: Nil + + override val extendedCheckRules = Seq( + sources.PreWriteCheck(catalog) + ) } override protected[sql] def createSession(): SQLSession = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index ecb990e8aa..acf2f7da30 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -53,7 +53,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { sql("CREATE TABLE createAndInsertTest (key int, value string)") // Add some data. - testData.insertInto("createAndInsertTest") + testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") // Make sure the table has also been updated. checkAnswer( @@ -62,7 +62,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { ) // Add more data. - testData.insertInto("createAndInsertTest") + testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") // Make sure the table has been updated. checkAnswer( @@ -71,7 +71,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { ) // Now overwrite. - testData.insertInto("createAndInsertTest", overwrite = true) + testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest") // Make sure the registered table has also been updated. checkAnswer( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c4c7b63496..9623ef06aa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -608,7 +608,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil) assert(df2.schema === expectedSchema2) - df2.insertInto("arrayInParquet", overwrite = false) + df2.write.mode(SaveMode.Append).insertInto("arrayInParquet") createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto. createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write @@ -642,7 +642,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", mapType2, nullable = true) :: Nil) assert(df2.schema === expectedSchema2) - df2.insertInto("mapInParquet", overwrite = false) + df2.write.mode(SaveMode.Append).insertInto("mapInParquet") createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) .saveAsTable("mapInParquet") // This one internally calls df2.insertInto. createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write @@ -768,7 +768,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), (6 to 34).map(i => Row(i, s"str$i"))) - createDF(40, 49).insertInto("insertParquet") + createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), (6 to 44).map(i => Row(i, s"str$i"))) @@ -782,7 +782,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p"), (50 to 59).map(i => Row(i, s"str$i"))) - createDF(70, 79).insertInto("insertParquet", overwrite = true) + createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p"), (70 to 79).map(i => Row(i, s"str$i"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ba53ed99be..b707f5e684 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} +import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -425,10 +425,10 @@ class SQLQuerySuite extends QueryTest { test("SPARK-4825 save join to table") { val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() sql("CREATE TABLE test1 (key INT, value STRING)") - testData.insertInto("test1") + testData.write.mode(SaveMode.Append).insertInto("test1") sql("CREATE TABLE test2 (key INT, value STRING)") - testData.insertInto("test2") - testData.insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") checkAnswer( table("test"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 223ba65f47..7851f38fd4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -316,7 +316,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + @@ -346,7 +346,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index c7c8bcd27f..32226905bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -362,16 +362,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { .partitionBy("p1") .saveAsTable("t") } - - // Using different order of partition columns - intercept[Throwable] { - partitionedTestDF2.write - .format(dataSourceName) - .mode(SaveMode.Append) - .option("dataSchema", dataSchema.json) - .partitionBy("p2", "p1") - .saveAsTable("t") - } } test("saveAsTable()/load() - partitioned table - ErrorIfExists") { -- cgit v1.2.3