aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-10 17:29:52 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-10 17:29:52 -0800
commitaaf50d05c7616e4f8f16654b642500ae06cdd774 (patch)
tree7f30e0d08e4f2b531ac62c82a4361a2db577932d /sql/hive
parented167e70c6d355f39b366ea0d3b92dd26d826a0b (diff)
downloadspark-aaf50d05c7616e4f8f16654b642500ae06cdd774.tar.gz
spark-aaf50d05c7616e4f8f16654b642500ae06cdd774.tar.bz2
spark-aaf50d05c7616e4f8f16654b642500ae06cdd774.zip
[SPARK-5658][SQL] Finalize DDL and write support APIs
https://issues.apache.org/jira/browse/SPARK-5658 Author: Yin Huai <yhuai@databricks.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #4446 from yhuai/writeSupportFollowup and squashes the following commits: f3a96f7 [Yin Huai] davies's comments. 225ff71 [Yin Huai] Use Scala TestHiveContext to initialize the Python HiveContext in Python tests. 2306f93 [Yin Huai] Style. 2091fcd [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 537e28f [Yin Huai] Correctly clean up temp data. ae4649e [Yin Huai] Fix Python test. 609129c [Yin Huai] Doc format. 92b6659 [Yin Huai] Python doc and other minor updates. cbc717f [Yin Huai] Rename dataSourceName to source. d1c12d3 [Yin Huai] No need to delete the duplicate rule since it has been removed in master. 22cfa70 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup d91ecb8 [Yin Huai] Fix test. 4c76d78 [Yin Huai] Simplify APIs. 3abc215 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 0832ce4 [Yin Huai] Fix test. 98e7cdb [Yin Huai] Python style. 2bf44ef [Yin Huai] Python APIs. c204967 [Yin Huai] Format a10223d [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 9ff97d8 [Yin Huai] Add SaveMode to saveAsTable. 9b6e570 [Yin Huai] Update doc. c2be775 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 99950a2 [Yin Huai] Use Java enum for SaveMode. 4679665 [Yin Huai] Remove duplicate rule. 77d89dc [Yin Huai] Update doc. e04d908 [Yin Huai] Move import and add (Scala-specific) to scala APIs. cf5703d [Yin Huai] Add checkAnswer to Java tests. 7db95ff [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 6dfd386 [Yin Huai] Add java test. f2f33ef [Yin Huai] Fix test. e702386 [Yin Huai] Apache header. b1e9b1b [Yin Huai] Format. ed4e1b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup af9e9b3 [Yin Huai] DDL and write support API followup. 2a6213a [Yin Huai] Update API names. e6a0b77 [Yin Huai] Update test. 43bae01 [Yin Huai] Remove createTable from HiveContext. 5ffc372 [Yin Huai] Add more load APIs to SQLContext. 5390743 [Yin Huai] Add more save APIs to DataFrame.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala76
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala105
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala)20
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java147
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala64
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala118
8 files changed, 416 insertions, 160 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2c00659496..7ae6ed6f84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -80,18 +80,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
/**
- * Creates a table using the schema of the given class.
- *
- * @param tableName The name of the table to create.
- * @param allowExisting When false, an exception will be thrown if the table already exists.
- * @tparam A A case class that is used to describe the schema of the table to be created.
- */
- @Deprecated
- def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
- catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
- }
-
- /**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
@@ -107,70 +95,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.invalidateTable("default", tableName)
}
- @Experimental
- def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = {
- val dataSourceName = conf.defaultDataSourceName
- createTable(tableName, dataSourceName, allowExisting, ("path", path))
- }
-
- @Experimental
- def createTable(
- tableName: String,
- dataSourceName: String,
- allowExisting: Boolean,
- option: (String, String),
- options: (String, String)*): Unit = {
- val cmd =
- CreateTableUsing(
- tableName,
- userSpecifiedSchema = None,
- dataSourceName,
- temporary = false,
- (option +: options).toMap,
- allowExisting)
- executePlan(cmd).toRdd
- }
-
- @Experimental
- def createTable(
- tableName: String,
- dataSourceName: String,
- schema: StructType,
- allowExisting: Boolean,
- option: (String, String),
- options: (String, String)*): Unit = {
- val cmd =
- CreateTableUsing(
- tableName,
- userSpecifiedSchema = Some(schema),
- dataSourceName,
- temporary = false,
- (option +: options).toMap,
- allowExisting)
- executePlan(cmd).toRdd
- }
-
- @Experimental
- def createTable(
- tableName: String,
- dataSourceName: String,
- allowExisting: Boolean,
- options: java.util.Map[String, String]): Unit = {
- val opts = options.toSeq
- createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*)
- }
-
- @Experimental
- def createTable(
- tableName: String,
- dataSourceName: String,
- schema: StructType,
- allowExisting: Boolean,
- options: java.util.Map[String, String]): Unit = {
- val opts = options.toSeq
- createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*)
- }
-
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 95abc363ae..cb138be90e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -216,20 +216,21 @@ private[hive] trait HiveStrategies {
object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) =>
+ case CreateTableUsing(
+ tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
ExecutedCommand(
CreateMetastoreDataSource(
- tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil
+ tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
- case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) =>
+ case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
val logicalPlan = hiveContext.parseSql(query)
val cmd =
- CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan)
+ CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan)
ExecutedCommand(cmd) :: Nil
- case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) =>
+ case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) =>
val cmd =
- CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query)
+ CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
ExecutedCommand(cmd) :: Nil
case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 95dcaccefd..f6bea1c6a6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.sources.ResolvedDataSource
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -105,7 +107,8 @@ case class CreateMetastoreDataSource(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
- allowExisting: Boolean) extends RunnableCommand {
+ allowExisting: Boolean,
+ managedIfNoPath: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
@@ -120,7 +123,7 @@ case class CreateMetastoreDataSource(
var isExternal = true
val optionsWithPath =
- if (!options.contains("path")) {
+ if (!options.contains("path") && managedIfNoPath) {
isExternal = false
options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
} else {
@@ -141,22 +144,13 @@ case class CreateMetastoreDataSource(
case class CreateMetastoreDataSourceAsSelect(
tableName: String,
provider: String,
+ mode: SaveMode,
options: Map[String, String],
- allowExisting: Boolean,
query: LogicalPlan) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
-
- if (hiveContext.catalog.tableExists(tableName :: Nil)) {
- if (allowExisting) {
- return Seq.empty[Row]
- } else {
- sys.error(s"Table $tableName already exists.")
- }
- }
-
- val df = DataFrame(hiveContext, query)
+ var createMetastoreTable = false
var isExternal = true
val optionsWithPath =
if (!options.contains("path")) {
@@ -166,15 +160,82 @@ case class CreateMetastoreDataSourceAsSelect(
options
}
- // Create the relation based on the data of df.
- ResolvedDataSource(sqlContext, provider, optionsWithPath, df)
+ if (sqlContext.catalog.tableExists(Seq(tableName))) {
+ // Check if we need to throw an exception or just return.
+ mode match {
+ case SaveMode.ErrorIfExists =>
+ sys.error(s"Table $tableName already exists. " +
+ s"If you want to append into it, please set mode to SaveMode.Append. " +
+ s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.")
+ case SaveMode.Ignore =>
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty[Row]
+ case SaveMode.Append =>
+ // Check if the specified data source match the data source of the existing table.
+ val resolved =
+ ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath)
+ val createdRelation = LogicalRelation(resolved.relation)
+ EliminateAnalysisOperators(sqlContext.table(tableName).logicalPlan) match {
+ case l @ LogicalRelation(i: InsertableRelation) =>
+ if (l.schema != createdRelation.schema) {
+ val errorDescription =
+ s"Cannot append to table $tableName because the schema of this " +
+ s"DataFrame does not match the schema of table $tableName."
+ val errorMessage =
+ s"""
+ |$errorDescription
+ |== Schemas ==
+ |${sideBySide(
+ s"== Expected Schema ==" +:
+ l.schema.treeString.split("\\\n"),
+ s"== Actual Schema ==" +:
+ createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
+ """.stripMargin
+ sys.error(errorMessage)
+ } else if (i != createdRelation.relation) {
+ val errorDescription =
+ s"Cannot append to table $tableName because the resolved relation does not " +
+ s"match the existing relation of $tableName. " +
+ s"You can use insertInto($tableName, false) to append this DataFrame to the " +
+ s"table $tableName and using its data source and options."
+ val errorMessage =
+ s"""
+ |$errorDescription
+ |== Relations ==
+ |${sideBySide(
+ s"== Expected Relation ==" ::
+ l.toString :: Nil,
+ s"== Actual Relation ==" ::
+ createdRelation.toString :: Nil).mkString("\n")}
+ """.stripMargin
+ sys.error(errorMessage)
+ }
+ case o =>
+ sys.error(s"Saving data in ${o.toString} is not supported.")
+ }
+ case SaveMode.Overwrite =>
+ hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")
+ // Need to create the table again.
+ createMetastoreTable = true
+ }
+ } else {
+ // The table does not exist. We need to create it in metastore.
+ createMetastoreTable = true
+ }
- hiveContext.catalog.createDataSourceTable(
- tableName,
- None,
- provider,
- optionsWithPath,
- isExternal)
+ val df = DataFrame(hiveContext, query)
+
+ // Create the relation based on the data of df.
+ ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
+
+ if (createMetastoreTable) {
+ hiveContext.catalog.createDataSourceTable(
+ tableName,
+ Some(df.schema),
+ provider,
+ optionsWithPath,
+ isExternal)
+ }
Seq.empty[Row]
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7c1d1133c3..840fbc1972 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.test
import java.io.File
import java.util.{Set => JavaSet}
-import scala.collection.mutable
-import scala.language.implicitConversions
-
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContainerOutputFormat}
import org.apache.hadoop.hive.ql.metadata.Table
@@ -30,16 +27,18 @@ import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.serde2.RegexSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.avro.AvroSerDe
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable
+import scala.language.implicitConversions
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -224,11 +223,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
}
}),
TestTable("src_thrift", () => {
- import org.apache.thrift.protocol.TBinaryProtocol
- import org.apache.hadoop.hive.serde2.thrift.test.Complex
import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
- import org.apache.hadoop.mapred.SequenceFileInputFormat
- import org.apache.hadoop.mapred.SequenceFileOutputFormat
+ import org.apache.hadoop.hive.serde2.thrift.test.Complex
+ import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
+ import org.apache.thrift.protocol.TBinaryProtocol
val srcThrift = new Table("default", "src_thrift")
srcThrift.setFields(Nil)
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
new file mode 100644
index 0000000000..9744a2aa3f
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.sources.SaveMode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.QueryTest$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.test.TestHive$;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.Utils;
+
+public class JavaMetastoreDataSourcesSuite {
+ private transient JavaSparkContext sc;
+ private transient HiveContext sqlContext;
+
+ String originalDefaultSource;
+ File path;
+ Path hiveManagedPath;
+ FileSystem fs;
+ DataFrame df;
+
+ private void checkAnswer(DataFrame actual, List<Row> expected) {
+ String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
+ if (errorMessage != null) {
+ Assert.fail(errorMessage);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ sqlContext = TestHive$.MODULE$;
+ sc = new JavaSparkContext(sqlContext.sparkContext());
+
+ originalDefaultSource = sqlContext.conf().defaultDataSourceName();
+ path =
+ Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
+ if (path.exists()) {
+ path.delete();
+ }
+ hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+ fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
+ if (fs.exists(hiveManagedPath)){
+ fs.delete(hiveManagedPath, true);
+ }
+
+ List<String> jsonObjects = new ArrayList<String>(10);
+ for (int i = 0; i < 10; i++) {
+ jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
+ }
+ JavaRDD<String> rdd = sc.parallelize(jsonObjects);
+ df = sqlContext.jsonRDD(rdd);
+ df.registerTempTable("jsonTable");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // Clean up tables.
+ sqlContext.sql("DROP TABLE IF EXISTS javaSavedTable");
+ sqlContext.sql("DROP TABLE IF EXISTS externalTable");
+ }
+
+ @Test
+ public void saveExternalTableAndQueryIt() {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("path", path.toString());
+ df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+
+ checkAnswer(
+ sqlContext.sql("SELECT * FROM javaSavedTable"),
+ df.collectAsList());
+
+ DataFrame loadedDF =
+ sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", options);
+
+ checkAnswer(loadedDF, df.collectAsList());
+ checkAnswer(
+ sqlContext.sql("SELECT * FROM externalTable"),
+ df.collectAsList());
+ }
+
+ @Test
+ public void saveExternalTableWithSchemaAndQueryIt() {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("path", path.toString());
+ df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+
+ checkAnswer(
+ sqlContext.sql("SELECT * FROM javaSavedTable"),
+ df.collectAsList());
+
+ List<StructField> fields = new ArrayList<>();
+ fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
+ StructType schema = DataTypes.createStructType(fields);
+ DataFrame loadedDF =
+ sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", schema, options);
+
+ checkAnswer(
+ loadedDF,
+ sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList());
+ checkAnswer(
+ sqlContext.sql("SELECT * FROM externalTable"),
+ sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList());
+ }
+
+ @Test
+ public void saveTableAndQueryIt() {
+ Map<String, String> options = new HashMap<String, String>();
+ df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+
+ checkAnswer(
+ sqlContext.sql("SELECT * FROM javaSavedTable"),
+ df.collectAsList());
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index ba39129388..0270e63557 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql
-import org.scalatest.FunSuite
+import scala.collection.JavaConversions._
-import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
@@ -55,9 +53,36 @@ class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[DataFrame]] to be executed
- * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+ * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+ QueryTest.checkAnswer(rdd, expectedAnswer) match {
+ case Some(errorMessage) => fail(errorMessage)
+ case None =>
+ }
+ }
+
+ protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
+ checkAnswer(rdd, Seq(expectedAnswer))
+ }
+
+ def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
+ test(sqlString) {
+ checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+ }
+ }
+}
+
+object QueryTest {
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ * If there was exception during the execution or the contents of the DataFrame does not
+ * match the expected result, an error message will be returned. Otherwise, a [[None]] will
+ * be returned.
+ * @param rdd the [[DataFrame]] to be executed
+ * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ */
+ def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
@@ -73,18 +98,20 @@ class QueryTest extends PlanTest {
}
val sparkAnswer = try rdd.collect().toSeq catch {
case e: Exception =>
- fail(
+ val errorMessage =
s"""
|Exception thrown while executing query:
|${rdd.queryExecution}
|== Exception ==
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
- """.stripMargin)
+ """.stripMargin
+ return Some(errorMessage)
}
if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
- fail(s"""
+ val errorMessage =
+ s"""
|Results do not match for query:
|${rdd.logicalPlan}
|== Analyzed Plan ==
@@ -93,22 +120,21 @@ class QueryTest extends PlanTest {
|${rdd.queryExecution.executedPlan}
|== Results ==
|${sideBySide(
- s"== Correct Answer - ${expectedAnswer.size} ==" +:
- prepareAnswer(expectedAnswer).map(_.toString),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
- """.stripMargin)
+ s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ prepareAnswer(expectedAnswer).map(_.toString),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ """.stripMargin
+ return Some(errorMessage)
}
- }
- protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
- checkAnswer(rdd, Seq(expectedAnswer))
+ return None
}
- def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
- test(sqlString) {
- checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+ def checkAnswer(rdd: DataFrame, expectedAnswer: java.util.List[Row]): String = {
+ checkAnswer(rdd, expectedAnswer.toSeq) match {
+ case Some(errorMessage) => errorMessage
+ case None => null
}
}
-
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 869d01eb39..43da7519ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -19,7 +19,11 @@ package org.apache.spark.sql.hive
import java.io.File
+import org.scalatest.BeforeAndAfter
+
import com.google.common.io.Files
+
+import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types._
@@ -29,15 +33,22 @@ import org.apache.spark.sql.hive.test.TestHive._
case class TestData(key: Int, value: String)
-class InsertIntoHiveTableSuite extends QueryTest {
+class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._
val testData = TestHive.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString)))
- testData.registerTempTable("testData")
+
+ before {
+ // Since every we are doing tests for DDL statements,
+ // it is better to reset before every test.
+ TestHive.reset()
+ // Register the testData, which will be used in every test.
+ testData.registerTempTable("testData")
+ }
test("insertInto() HiveTable") {
- createTable[TestData]("createAndInsertTest")
+ sql("CREATE TABLE createAndInsertTest (key int, value string)")
// Add some data.
testData.insertInto("createAndInsertTest")
@@ -68,16 +79,18 @@ class InsertIntoHiveTableSuite extends QueryTest {
}
test("Double create fails when allowExisting = false") {
- createTable[TestData]("doubleCreateAndInsertTest")
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
- intercept[org.apache.hadoop.hive.ql.metadata.HiveException] {
- createTable[TestData]("doubleCreateAndInsertTest", allowExisting = false)
- }
+ val message = intercept[QueryExecutionException] {
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+ }.getMessage
+
+ println("message!!!!" + message)
}
test("Double create does not fail when allowExisting = true") {
- createTable[TestData]("createAndInsertTest")
- createTable[TestData]("createAndInsertTest")
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+ sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
}
test("SPARK-4052: scala.collection.Map as value type of MapType") {
@@ -98,7 +111,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
}
test("SPARK-4203:random partition directory order") {
- createTable[TestData]("tmp_table")
+ sql("CREATE TABLE tmp_table (key int, value string)")
val tmpDir = Files.createTempDir()
sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 9ce058909f..f94aabd29a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.hive
import java.io.File
+import org.apache.spark.sql.sources.SaveMode
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql._
@@ -41,11 +43,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
override def afterEach(): Unit = {
reset()
- if (ctasPath.exists()) Utils.deleteRecursively(ctasPath)
+ if (tempPath.exists()) Utils.deleteRecursively(tempPath)
}
val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
- var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ var tempPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
test ("persistent JSON table") {
sql(
@@ -270,7 +272,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
- | path '${ctasPath}'
+ | path '${tempPath}'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
@@ -297,7 +299,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
- | path '${ctasPath}'
+ | path '${tempPath}'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
@@ -309,7 +311,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
- | path '${ctasPath}'
+ | path '${tempPath}'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
@@ -325,7 +327,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|CREATE TABLE IF NOT EXISTS ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
- | path '${ctasPath}'
+ | path '${tempPath}'
|) AS
|SELECT a FROM jsonTable
""".stripMargin)
@@ -400,38 +402,122 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE jsonTable").collect().foreach(println)
}
- test("save and load table") {
+ test("save table") {
val originalDefaultSource = conf.defaultDataSourceName
- conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = jsonRDD(rdd)
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+ // Save the df as a managed table (by not specifiying the path).
df.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
- createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false)
+ // Right now, we cannot append to an existing JSON table.
+ intercept[RuntimeException] {
+ df.saveAsTable("savedJsonTable", SaveMode.Append)
+ }
+
+ // We can overwrite it.
+ df.saveAsTable("savedJsonTable", SaveMode.Overwrite)
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable"),
+ df.collect())
+
+ // When the save mode is Ignore, we will do nothing when the table already exists.
+ df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore)
+ assert(df.schema === table("savedJsonTable").schema)
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable"),
+ df.collect())
+
+ // Drop table will also delete the data.
+ sql("DROP TABLE savedJsonTable")
+ intercept[InvalidInputException] {
+ jsonFile(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+ }
+
+ // Create an external table by specifying the path.
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ df.saveAsTable(
+ "savedJsonTable",
+ "org.apache.spark.sql.json",
+ SaveMode.Append,
+ Map("path" -> tempPath.toString))
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable"),
+ df.collect())
+
+ // Data should not be deleted after we drop the table.
+ sql("DROP TABLE savedJsonTable")
+ checkAnswer(
+ jsonFile(tempPath.toString),
+ df.collect())
+
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+ }
+
+ test("create external table") {
+ val originalDefaultSource = conf.defaultDataSourceName
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ val df = jsonRDD(rdd)
+
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ df.saveAsTable(
+ "savedJsonTable",
+ "org.apache.spark.sql.json",
+ SaveMode.Append,
+ Map("path" -> tempPath.toString))
+
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+ createExternalTable("createdJsonTable", tempPath.toString)
assert(table("createdJsonTable").schema === df.schema)
checkAnswer(
sql("SELECT * FROM createdJsonTable"),
df.collect())
- val message = intercept[RuntimeException] {
- createTable("createdJsonTable", filePath.toString, false)
+ var message = intercept[RuntimeException] {
+ createExternalTable("createdJsonTable", filePath.toString)
}.getMessage
assert(message.contains("Table createdJsonTable already exists."),
"We should complain that ctasJsonTable already exists")
- createTable("createdJsonTable", filePath.toString, true)
- // createdJsonTable should be not changed.
- assert(table("createdJsonTable").schema === df.schema)
+ // Data should not be deleted.
+ sql("DROP TABLE createdJsonTable")
checkAnswer(
- sql("SELECT * FROM createdJsonTable"),
+ jsonFile(tempPath.toString),
df.collect())
- conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+ // Try to specify the schema.
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ val schema = StructType(StructField("b", StringType, true) :: Nil)
+ createExternalTable(
+ "createdJsonTable",
+ "org.apache.spark.sql.json",
+ schema,
+ Map("path" -> tempPath.toString))
+ checkAnswer(
+ sql("SELECT * FROM createdJsonTable"),
+ sql("SELECT b FROM savedJsonTable").collect())
+
+ sql("DROP TABLE createdJsonTable")
+
+ message = intercept[RuntimeException] {
+ createExternalTable(
+ "createdJsonTable",
+ "org.apache.spark.sql.json",
+ schema,
+ Map.empty[String, String])
+ }.getMessage
+ assert(
+ message.contains("Option 'path' not specified"),
+ "We should complain that path is not specified.")
+
+ sql("DROP TABLE savedJsonTable")
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}
}