aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-23 09:48:20 -0700
committerYin Huai <yhuai@databricks.com>2015-05-23 09:48:20 -0700
commit2b7e63585d61be2dab78b70af3867cda3983d5b1 (patch)
tree845de9eb688867035c27923f690d9151f142dbe3
parenta4df0f2d84ff24318b139db534521141d9d4d593 (diff)
downloadspark-2b7e63585d61be2dab78b70af3867cda3983d5b1.tar.gz
spark-2b7e63585d61be2dab78b70af3867cda3983d5b1.tar.bz2
spark-2b7e63585d61be2dab78b70af3867cda3983d5b1.zip
[SPARK-7654] [SQL] Move insertInto into reader/writer interface.
This one continues the work of https://github.com/apache/spark/pull/6216. Author: Yin Huai <yhuai@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #6366 from yhuai/insert and squashes the following commits: 3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable. 56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer. c636e35 [Yin Huai] Remove unnecessary empty lines. cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation. 0841a54 [Reynold Xin] Removed experimental tag for deprecated methods. 33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala66
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala10
14 files changed, 116 insertions, 89 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3ec1c4a2f1..f968577bc5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1395,28 +1395,6 @@ class DataFrame private[sql](
def write: DataFrameWriter = new DataFrameWriter(this)
/**
- * :: Experimental ::
- * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
- * @group output
- * @since 1.3.0
- */
- @Experimental
- def insertInto(tableName: String, overwrite: Boolean): Unit = {
- sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
- Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
- }
-
- /**
- * :: Experimental ::
- * Adds the rows from this RDD to the specified table.
- * Throws an exception if the table already exists.
- * @group output
- * @since 1.3.0
- */
- @Experimental
- def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
- /**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
* @since 1.3.0
@@ -1551,13 +1529,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
- if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
- // If table already exists and the save mode is Append,
- // we will just call insertInto to append the contents of this DataFrame.
- insertInto(tableName, overwrite = false)
- } else {
- write.mode(mode).saveAsTable(tableName)
- }
+ write.mode(mode).saveAsTable(tableName)
}
/**
@@ -1713,9 +1685,29 @@ class DataFrame private[sql](
write.format(source).mode(mode).options(options).save()
}
+
+ /**
+ * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+ * @group output
+ */
+ @deprecated("Use write.mode(SaveMode.Append|SaveMode.Overwrite).saveAsTable(tableName)", "1.4.0")
+ def insertInto(tableName: String, overwrite: Boolean): Unit = {
+ write.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append).insertInto(tableName)
+ }
+
+ /**
+ * Adds the rows from this RDD to the specified table.
+ * Throws an exception if the table already exists.
+ * @group output
+ */
+ @deprecated("Use write.mode(SaveMode.Append).saveAsTable(tableName)", "1.4.0")
+ def insertInto(tableName: String): Unit = {
+ write.mode(SaveMode.Append).insertInto(tableName)
+ }
+
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
- // End of eeprecated methods
+ // End of deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 381c10f48f..b44d4c86ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -95,20 +95,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
}
/**
- * Specifies the input partitioning. If specified, the underlying data source does not need to
- * discover the data partitioning scheme, and thus can speed up very large inputs.
- *
- * This is only applicable for Parquet at the moment.
- *
- * @since 1.4.0
- */
- @scala.annotation.varargs
- def partitionBy(colNames: String*): DataFrameReader = {
- this.partitioningColumns = Option(colNames)
- this
- }
-
- /**
* Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
@@ -128,7 +114,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+ partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(resolved.relation))
@@ -300,6 +286,4 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
- private var partitioningColumns: Option[Seq[String]] = None
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f2e721d4db..5548b26cb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql
import java.util.Properties
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
@@ -149,21 +151,65 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * Inserts the content of the [[DataFrame]] to the specified table. It requires that
+ * the schema of the [[DataFrame]] is the same as the schema of the table.
+ *
+ * Because it inserts data to an existing table, format or options will be ignored.
+ *
+ * @since 1.4.0
+ */
+ def insertInto(tableName: String): Unit = {
+ val partitions =
+ partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap)
+ val overwrite = (mode == SaveMode.Overwrite)
+ df.sqlContext.executePlan(InsertIntoTable(
+ UnresolvedRelation(Seq(tableName)),
+ partitions.getOrElse(Map.empty[String, Option[String]]),
+ df.logicalPlan,
+ overwrite,
+ ifNotExists = false)).toRdd
+ }
+
+ /**
* Saves the content of the [[DataFrame]] as the specified table.
*
+ * In the case the table already exists, behavior of this function depends on the
+ * save mode, specified by the `mode` function (default to throwing an exception).
+ * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be
+ * the same as that of the existing table.
+ * When `mode` is `Append`, the schema of the [[DataFrame]] need to be
+ * the same as that of the existing table, and format or options will be ignored.
+ *
* @since 1.4.0
*/
def saveAsTable(tableName: String): Unit = {
- val cmd =
- CreateTableUsingAsSelect(
- tableName,
- source,
- temporary = false,
- partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
- mode,
- extraOptions.toMap,
- df.logicalPlan)
- df.sqlContext.executePlan(cmd).toRdd
+ if (df.sqlContext.catalog.tableExists(tableName :: Nil) && mode != SaveMode.Overwrite) {
+ mode match {
+ case SaveMode.Ignore =>
+ // Do nothing
+
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(s"Table $tableName already exists.")
+
+ case SaveMode.Append =>
+ // If it is Append, we just ask insertInto to handle it. We will not use insertInto
+ // to handle saveAsTable with Overwrite because saveAsTable can change the schema of
+ // the table. But, insertInto with Overwrite requires the schema of data be the same
+ // the schema of the table.
+ insertInto(tableName)
+ }
+ } else {
+ val cmd =
+ CreateTableUsingAsSelect(
+ tableName,
+ source,
+ temporary = false,
+ partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+ mode,
+ extraOptions.toMap,
+ df.logicalPlan)
+ df.sqlContext.executePlan(cmd).toRdd
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index c45c431438..70a220cc43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -129,7 +129,7 @@ private[parquet] object RowReadSupport {
}
/**
- * A `parquet.hadoop.api.WriteSupport` for Row ojects.
+ * A `parquet.hadoop.api.WriteSupport` for Row objects.
*/
private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index c03649d00b..dacd967cff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -105,10 +105,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
+ l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
- execution.ExecutedCommand(
- InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
+ execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 498f7538d4..c3674a8c76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -61,7 +61,6 @@ private[sql] case class InsertIntoDataSource(
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
- partitionColumns: Array[String],
mode: SaveMode)
extends RunnableCommand {
@@ -100,6 +99,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
relation.schema,
needsConversion = false)
+ val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
insert(new DefaultWriterContainer(relation, job), df)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 5e723122ee..ca30b8e746 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -335,7 +335,6 @@ private[sql] object ResolvedDataSource {
InsertIntoHadoopFsRelation(
r,
project,
- partitionColumns.toArray,
mode)).toRdd
r
case _ =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index ab33125b74..a3fd7f13b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -35,9 +35,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- // We are inserting into an InsertableRelation.
+ // We are inserting into an InsertableRelation or HadoopFsRelation.
case i @ InsertIntoTable(
- l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => {
+ l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
@@ -101,7 +101,20 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
}
- case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
+ case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
+ // We need to make sure the partition columns specified by users do match partition
+ // columns of the relation.
+ val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
+ val specifiedPartitionColumns = part.keySet
+ if (existingPartitionColumns != specifiedPartitionColumns) {
+ failAnalysis(s"Specified partition columns " +
+ s"(${specifiedPartitionColumns.mkString(", ")}) " +
+ s"do not match the partition columns of the table. Please use " +
+ s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+ } else {
+ // OK
+ }
+
case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8e8e70db0..0d807f428a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -373,6 +373,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
ResolveHiveWindowFunction ::
sources.PreInsertCastAndRename ::
Nil
+
+ override val extendedCheckRules = Seq(
+ sources.PreWriteCheck(catalog)
+ )
}
override protected[sql] def createSession(): SQLSession = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index ecb990e8aa..acf2f7da30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -53,7 +53,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
sql("CREATE TABLE createAndInsertTest (key int, value string)")
// Add some data.
- testData.insertInto("createAndInsertTest")
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
// Make sure the table has also been updated.
checkAnswer(
@@ -62,7 +62,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
)
// Add more data.
- testData.insertInto("createAndInsertTest")
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
// Make sure the table has been updated.
checkAnswer(
@@ -71,7 +71,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
)
// Now overwrite.
- testData.insertInto("createAndInsertTest", overwrite = true)
+ testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
// Make sure the registered table has also been updated.
checkAnswer(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c4c7b63496..9623ef06aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -608,7 +608,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
- df2.insertInto("arrayInParquet", overwrite = false)
+ df2.write.mode(SaveMode.Append).insertInto("arrayInParquet")
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -642,7 +642,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
- df2.insertInto("mapInParquet", overwrite = false)
+ df2.write.mode(SaveMode.Append).insertInto("mapInParquet")
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -768,7 +768,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))
- createDF(40, 49).insertInto("insertParquet")
+ createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))
@@ -782,7 +782,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p"),
(50 to 59).map(i => Row(i, s"str$i")))
- createDF(70, 79).insertInto("insertParquet", overwrite = true)
+ createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
(70 to 79).map(i => Row(i, s"str$i")))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ba53ed99be..b707f5e684 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -425,10 +425,10 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-4825 save join to table") {
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
sql("CREATE TABLE test1 (key INT, value STRING)")
- testData.insertInto("test1")
+ testData.write.mode(SaveMode.Append).insertInto("test1")
sql("CREATE TABLE test2 (key INT, value STRING)")
- testData.insertInto("test2")
- testData.insertInto("test2")
+ testData.write.mode(SaveMode.Append).insertInto("test2")
+ testData.write.mode(SaveMode.Append).insertInto("test2")
sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 223ba65f47..7851f38fd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -316,7 +316,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
@@ -346,7 +346,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index c7c8bcd27f..32226905bc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -362,16 +362,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
.partitionBy("p1")
.saveAsTable("t")
}
-
- // Using different order of partition columns
- intercept[Throwable] {
- partitionedTestDF2.write
- .format(dataSourceName)
- .mode(SaveMode.Append)
- .option("dataSchema", dataSchema.json)
- .partitionBy("p2", "p1")
- .saveAsTable("t")
- }
}
test("saveAsTable()/load() - partitioned table - ErrorIfExists") {