aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-23 09:48:20 -0700
committerYin Huai <yhuai@databricks.com>2015-05-23 09:48:20 -0700
commit2b7e63585d61be2dab78b70af3867cda3983d5b1 (patch)
tree845de9eb688867035c27923f690d9151f142dbe3 /sql/hive
parenta4df0f2d84ff24318b139db534521141d9d4d593 (diff)
downloadspark-2b7e63585d61be2dab78b70af3867cda3983d5b1.tar.gz
spark-2b7e63585d61be2dab78b70af3867cda3983d5b1.tar.bz2
spark-2b7e63585d61be2dab78b70af3867cda3983d5b1.zip
[SPARK-7654] [SQL] Move insertInto into reader/writer interface.
This one continues the work of https://github.com/apache/spark/pull/6216. Author: Yin Huai <yhuai@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #6366 from yhuai/insert and squashes the following commits: 3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable. 56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer. c636e35 [Yin Huai] Remove unnecessary empty lines. cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation. 0841a54 [Reynold Xin] Removed experimental tag for deprecated methods. 33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala10
6 files changed, 17 insertions, 23 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8e8e70db0..0d807f428a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -373,6 +373,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
ResolveHiveWindowFunction ::
sources.PreInsertCastAndRename ::
Nil
+
+ override val extendedCheckRules = Seq(
+ sources.PreWriteCheck(catalog)
+ )
}
override protected[sql] def createSession(): SQLSession = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index ecb990e8aa..acf2f7da30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -53,7 +53,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
sql("CREATE TABLE createAndInsertTest (key int, value string)")
// Add some data.
- testData.insertInto("createAndInsertTest")
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
// Make sure the table has also been updated.
checkAnswer(
@@ -62,7 +62,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
)
// Add more data.
- testData.insertInto("createAndInsertTest")
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
// Make sure the table has been updated.
checkAnswer(
@@ -71,7 +71,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
)
// Now overwrite.
- testData.insertInto("createAndInsertTest", overwrite = true)
+ testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
// Make sure the registered table has also been updated.
checkAnswer(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c4c7b63496..9623ef06aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -608,7 +608,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
- df2.insertInto("arrayInParquet", overwrite = false)
+ df2.write.mode(SaveMode.Append).insertInto("arrayInParquet")
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -642,7 +642,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
- df2.insertInto("mapInParquet", overwrite = false)
+ df2.write.mode(SaveMode.Append).insertInto("mapInParquet")
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -768,7 +768,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))
- createDF(40, 49).insertInto("insertParquet")
+ createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))
@@ -782,7 +782,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p"),
(50 to 59).map(i => Row(i, s"str$i")))
- createDF(70, 79).insertInto("insertParquet", overwrite = true)
+ createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
(70 to 79).map(i => Row(i, s"str$i")))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ba53ed99be..b707f5e684 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -425,10 +425,10 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-4825 save join to table") {
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
sql("CREATE TABLE test1 (key INT, value STRING)")
- testData.insertInto("test1")
+ testData.write.mode(SaveMode.Append).insertInto("test1")
sql("CREATE TABLE test2 (key INT, value STRING)")
- testData.insertInto("test2")
- testData.insertInto("test2")
+ testData.write.mode(SaveMode.Append).insertInto("test2")
+ testData.write.mode(SaveMode.Append).insertInto("test2")
sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 223ba65f47..7851f38fd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -316,7 +316,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
@@ -346,7 +346,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index c7c8bcd27f..32226905bc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -362,16 +362,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
.partitionBy("p1")
.saveAsTable("t")
}
-
- // Using different order of partition columns
- intercept[Throwable] {
- partitionedTestDF2.write
- .format(dataSourceName)
- .mode(SaveMode.Append)
- .option("dataSchema", dataSchema.json)
- .partitionBy("p2", "p1")
- .saveAsTable("t")
- }
}
test("saveAsTable()/load() - partitioned table - ErrorIfExists") {