From 273c2fd08deb49e970ec471c857dcf0b2953f922 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 15 Apr 2014 20:40:40 -0700 Subject: [SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis. Author: Michael Armbrust Closes #354 from marmbrus/insertIntoTable and squashes the following commits: 6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests. f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable 765c506 [Michael Armbrust] Add to JavaAPI. 77b512c [Michael Armbrust] typos. 5c3ef95 [Michael Armbrust] use names for boolean args. 882afdf [Michael Armbrust] Change createTableAs to saveAsTable. Clean up api annotations. d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables. fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well. Add createTableAs function. --- .../scala/org/apache/spark/sql/SQLContext.scala | 57 +++++++- .../scala/org/apache/spark/sql/SchemaRDD.scala | 28 +--- .../scala/org/apache/spark/sql/SchemaRDDLike.scala | 59 +++++++- .../apache/spark/sql/api/java/JavaSQLContext.scala | 78 ++++++++--- .../apache/spark/sql/parquet/ParquetRelation.scala | 11 +- .../org/apache/spark/sql/InsertIntoSuite.scala | 148 +++++++++++++++++++++ .../scala/org/apache/spark/sql/QueryTest.scala | 11 +- .../test/scala/org/apache/spark/sql/TestData.scala | 3 +- .../apache/spark/sql/execution/PlannerSuite.scala | 8 +- .../spark/sql/parquet/ParquetQuerySuite.scala | 41 +----- .../org/apache/spark/sql/hive/HiveContext.scala | 18 ++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +- .../scala/org/apache/spark/sql/QueryTest.scala | 77 +++++++++++ .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 77 +++++++++++ .../spark/sql/parquet/HiveParquetSuite.scala | 52 -------- 15 files changed, 525 insertions(+), 156 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 24d60ea074..4d216b5cd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -20,18 +20,26 @@ package org.apache.spark.sql import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag +import org.apache.hadoop.conf.Configuration + import org.apache.spark.SparkContext -import org.apache.spark.annotation.{AlphaComponent, Experimental} +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD + import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.dsl +import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor + import org.apache.spark.sql.columnar.InMemoryColumnarTableScan + import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.SparkStrategies + +import org.apache.spark.sql.parquet.ParquetRelation /** * :: AlphaComponent :: @@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext) new this.QueryExecution { val logical = plan } /** - * :: Experimental :: + * :: DeveloperApi :: * Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan - * interface is considered internal, and thus not guranteed to be stable. As a result, using - * them directly is not reccomended. + * interface is considered internal, and thus not guaranteed to be stable. As a result, using + * them directly is not recommended. */ - @Experimental + @DeveloperApi implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan) /** @@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext) def parquetFile(path: String): SchemaRDD = new SchemaRDD(this, parquet.ParquetRelation(path)) + /** + * :: Experimental :: + * Creates an empty parquet file with the schema of class `A`, which can be registered as a table. + * This registered table can be used as the target of future `insertInto` operations. + * + * {{{ + * val sqlContext = new SQLContext(...) + * import sqlContext._ + * + * case class Person(name: String, age: Int) + * createParquetFile[Person]("path/to/file.parquet").registerAsTable("people") + * sql("INSERT INTO people SELECT 'michael', 29") + * }}} + * + * @tparam A A case class type that describes the desired schema of the parquet file to be + * created. + * @param path The path where the directory containing parquet metadata should be created. + * Data inserted into this table will also be stored at this location. + * @param allowExisting When false, an exception will be thrown if this directory already exists. + * @param conf A Hadoop configuration object that can be used to specify options to the parquet + * output format. + * + * @group userf + */ + @Experimental + def createParquetFile[A <: Product : TypeTag]( + path: String, + allowExisting: Boolean = true, + conf: Configuration = new Configuration()): SchemaRDD = { + new SchemaRDD( + this, + ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf)) + } /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only @@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext) } /** + * :: DeveloperApi :: * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. */ + @DeveloperApi protected abstract class QueryExecution { def logical: LogicalPlan @@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext) override def toString: String = s"""== Logical Plan == |${stringOrError(analyzed)} - |== Optimized Logical Plan + |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index a771147f90..f2ae5b0fe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import net.razorvine.pickle.Pickler import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} -import org.apache.spark.annotation.{AlphaComponent, Experimental} +import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -83,8 +83,6 @@ import java.util.{Map => JMap} * rdd.where('key === 1).orderBy('value.asc).select('key).collect() * }}} * - * @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs. - * * @groupname Query Language Integrated Queries * @groupdesc Query Functions that create new queries from SchemaRDDs. The * result of all query functions is also a SchemaRDD, allowing multiple operations to be @@ -276,8 +274,8 @@ class SchemaRDD( * an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a * given row, a single row will be output, with `NULL` values for each of the * generated columns. - * @param alias an optional alias that can be used as qualif for the attributes that are produced - * by this generate operation. + * @param alias an optional alias that can be used as qualifier for the attributes that are + * produced by this generate operation. * * @group Query */ @@ -290,29 +288,13 @@ class SchemaRDD( new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan)) /** - * :: Experimental :: - * Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is - * no notion of persistent tables, and thus queries that contain this operator will fail to - * optimize. When working with an extension of a SQLContext that has a persistent catalog, such - * as a `HiveContext`, this operation will result in insertions to the table specified. + * Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit + * conversion from a standard RDD to a SchemaRDD. * * @group schema */ - @Experimental - def insertInto(tableName: String, overwrite: Boolean = false) = - new SchemaRDD( - sqlContext, - InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)) - - /** - * Returns this RDD as a SchemaRDD. - * @group schema - */ def toSchemaRDD = this - /** FOR INTERNAL USE ONLY */ - def analyze = sqlContext.analyzer(logicalPlan) - private[sql] def javaToPython: JavaRDD[Array[Byte]] = { val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) this.mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 3dd9897c0d..a390ab6005 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical._ /** @@ -29,14 +31,24 @@ trait SchemaRDDLike { private[sql] def baseSchemaRDD: SchemaRDD /** + * :: DeveloperApi :: * A lazily computed query execution workflow. All other RDD operations are passed - * through to the RDD that is produced by this workflow. + * through to the RDD that is produced by this workflow. This workflow is produced lazily because + * invoking the whole query optimization pipeline can be expensive. * - * We want this to be lazy because invoking the whole query optimization pipeline can be - * expensive. + * The query execution is considered a Developer API as phases may be added or removed in future + * releases. This execution is only exposed to provide an interface for inspecting the various + * phases for debugging purposes. Applications should not depend on particular phases existing + * or producing any specific output, even for exactly the same query. + * + * Additionally, the RDD exposed by this execution is not designed for consumption by end users. + * In particular, it does not contain any schema information, and it reuses Row objects + * internally. This object reuse improves performance, but can make programming against the RDD + * more difficult. Instead end users should perform RDD operations on a SchemaRDD directly. */ @transient - protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan) + @DeveloperApi + lazy val queryExecution = sqlContext.executePlan(logicalPlan) override def toString = s"""${super.toString} @@ -45,7 +57,8 @@ trait SchemaRDDLike { /** * Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that - * are written out using this method can be read back in as a SchemaRDD using the ``function + * are written out using this method can be read back in as a SchemaRDD using the `parquetFile` + * function. * * @group schema */ @@ -62,4 +75,40 @@ trait SchemaRDDLike { def registerAsTable(tableName: String): Unit = { sqlContext.registerRDDAsTable(baseSchemaRDD, tableName) } + + /** + * :: Experimental :: + * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. + * + * @group schema + */ + @Experimental + def insertInto(tableName: String, overwrite: Boolean): Unit = + sqlContext.executePlan( + InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd + + /** + * :: Experimental :: + * Appends the rows from this RDD to the specified table. + * + * @group schema + */ + @Experimental + def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false) + + /** + * :: Experimental :: + * Creates a table from the the contents of this SchemaRDD. This will fail if the table already + * exists. + * + * Note that this currently only works with SchemaRDDs that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * @group schema + */ + @Experimental + def saveAsTable(tableName: String): Unit = + sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 573345e42c..26922f7f33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.api.java -import java.beans.{Introspector, PropertyDescriptor} +import java.beans.Introspector +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow} @@ -45,29 +48,42 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { result } + /** + * :: Experimental :: + * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as + * a table. This registered table can be used as the target of future insertInto` operations. + * + * {{{ + * JavaSQLContext sqlCtx = new JavaSQLContext(...) + * + * sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people") + * sqlCtx.sql("INSERT INTO people SELECT 'michael', 29") + * }}} + * + * @param beanClass A java bean class object that will be used to determine the schema of the + * parquet file. s + * @param path The path where the directory containing parquet metadata should be created. + * Data inserted into this table will also be stored at this location. + * @param allowExisting When false, an exception will be thrown if this directory already exists. + * @param conf A Hadoop configuration object that can be used to specific options to the parquet + * output format. + */ + @Experimental + def createParquetFile( + beanClass: Class[_], + path: String, + allowExisting: Boolean = true, + conf: Configuration = new Configuration()): JavaSchemaRDD = { + new JavaSchemaRDD( + sqlContext, + ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf)) + } + /** * Applies a schema to an RDD of Java Beans. */ def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = { - // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. - val beanInfo = Introspector.getBeanInfo(beanClass) - - val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") - val schema = fields.map { property => - val dataType = property.getPropertyType match { - case c: Class[_] if c == classOf[java.lang.String] => StringType - case c: Class[_] if c == java.lang.Short.TYPE => ShortType - case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - case c: Class[_] if c == java.lang.Long.TYPE => LongType - case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - case c: Class[_] if c == java.lang.Float.TYPE => FloatType - case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType - } - - AttributeReference(property.getName, dataType, true)() - } - + val schema = getSchema(beanClass) val className = beanClass.getCanonicalName val rowRdd = rdd.rdd.mapPartitions { iter => // BeanInfo is not serializable so we must rediscover it remotely for each partition. @@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = { sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName) } + + /** Returns a Catalyst Schema for the given java bean class. */ + protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = { + // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. + val beanInfo = Introspector.getBeanInfo(beanClass) + + val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") + fields.map { property => + val dataType = property.getPropertyType match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == java.lang.Short.TYPE => ShortType + case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + case c: Class[_] if c == java.lang.Long.TYPE => LongType + case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + case c: Class[_] if c == java.lang.Float.TYPE => FloatType + case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + } + // TODO: Nullability could be stricter. + AttributeReference(property.getName, dataType, nullable = true)() + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 4d7c86a3a4..32813a66de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -119,7 +119,7 @@ private[sql] object ParquetRelation { child, "Attempt to create Parquet table from unresolved child (when schema is not available)") } - createEmpty(pathString, child.output, conf) + createEmpty(pathString, child.output, false, conf) } /** @@ -133,8 +133,9 @@ private[sql] object ParquetRelation { */ def createEmpty(pathString: String, attributes: Seq[Attribute], + allowExisting: Boolean, conf: Configuration): ParquetRelation = { - val path = checkPath(pathString, conf) + val path = checkPath(pathString, allowExisting, conf) if (conf.get(ParquetOutputFormat.COMPRESSION) == null) { conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name()) } @@ -143,7 +144,7 @@ private[sql] object ParquetRelation { new ParquetRelation(path.toString) } - private def checkPath(pathStr: String, conf: Configuration): Path = { + private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { if (pathStr == null) { throw new IllegalArgumentException("Unable to create ParquetRelation: path is null") } @@ -154,6 +155,10 @@ private[sql] object ParquetRelation { s"Unable to create ParquetRelation: incorrectly formatted path $pathStr") } val path = origPath.makeQualified(fs) + if (!allowExisting && fs.exists(path)) { + sys.error(s"File $pathStr already exists.") + } + if (fs.exists(path) && !fs.getFileStatus(path) .getPermission diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala new file mode 100644 index 0000000000..73d87963b3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +/* Implicits */ +import org.apache.spark.sql.test.TestSQLContext._ + +class InsertIntoSuite extends QueryTest { + TestData // Initialize TestData + import TestData._ + + test("insertInto() created parquet file") { + val testFilePath = File.createTempFile("sparkSql", "pqt") + testFilePath.delete() + val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) + testFile.registerAsTable("createAndInsertTest") + + // Add some data. + testData.insertInto("createAndInsertTest") + + // Make sure its there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq + ) + + // Add more data. + testData.insertInto("createAndInsertTest") + + // Make sure all data is there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq ++ testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq ++ testData.collect().toSeq + ) + + // Now overwrite. + testData.insertInto("createAndInsertTest", overwrite = true) + + // Make sure its there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq + ) + } + + test("INSERT INTO parquet table") { + val testFilePath = File.createTempFile("sparkSql", "pqt") + testFilePath.delete() + val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) + testFile.registerAsTable("createAndInsertSQLTest") + + sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData") + + // Make sure its there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertSQLTest"), + testData.collect().toSeq + ) + + // Append more data. + sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData") + + // Make sure all data is there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq ++ testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertSQLTest"), + testData.collect().toSeq ++ testData.collect().toSeq + ) + + sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData") + + // Make sure its there for a new instance of parquet file. + checkAnswer( + parquetFile(testFilePath.getCanonicalPath), + testData.collect().toSeq + ) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertSQLTest"), + testData.collect().toSeq + ) + } + + test("Double create fails when allowExisting = false") { + val testFilePath = File.createTempFile("sparkSql", "pqt") + testFilePath.delete() + val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) + + intercept[RuntimeException] { + createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false) + } + } + + test("Double create does not fail when allowExisting = true") { + val testFilePath = File.createTempFile("sparkSql", "pqt") + testFilePath.delete() + val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) + + createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index d719ceb827..d6072b402a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -49,18 +49,21 @@ class QueryTest extends FunSuite { |$e """.stripMargin) } + if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { fail(s""" |Results do not match for query: |${rdd.logicalPlan} |== Analyzed Plan == |${rdd.queryExecution.analyzed} - |== RDD == - |$rdd + |== Physical Plan == + |${rdd.queryExecution.executedPlan} |== Results == |${sideBySide( - prepareAnswer(convertedAnswer).map(_.toString), - prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} + s"== Correct Answer - ${convertedAnswer.size} ==" +: + prepareAnswer(convertedAnswer).map(_.toString), + s"== Spark Answer - ${sparkAnswer.size} ==" +: + prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} """.stripMargin) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 0bb13cf442..271b1d9fca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -23,8 +23,9 @@ import org.apache.spark.sql.test._ /* Implicits */ import TestSQLContext._ +case class TestData(key: Int, value: String) + object TestData { - case class TestData(key: Int, value: String) val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))) testData.registerAsTable("testData") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 658ff0927a..e24c74a7a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -38,7 +38,7 @@ class PlannerSuite extends FunSuite { } test("count is partially aggregated") { - val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan + val query = testData.groupBy('value)(Count('key)).queryExecution.analyzed val planned = PartialAggregation(query).head val aggregations = planned.collect { case a: Aggregate => a } @@ -46,14 +46,14 @@ class PlannerSuite extends FunSuite { } test("count distinct is not partially aggregated") { - val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan - val planned = PartialAggregation(query.logicalPlan) + val query = testData.groupBy('value)(CountDistinct('key :: Nil)).queryExecution.analyzed + val planned = PartialAggregation(query) assert(planned.isEmpty) } test("mixed aggregates are not partially aggregated") { val query = - testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan + testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).queryExecution.analyzed val planned = PartialAggregation(query) assert(planned.isEmpty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index fc68d6c562..d9c9b9a076 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.hadoop.fs.{Path, FileSystem} @@ -26,21 +28,23 @@ import parquet.hadoop.ParquetFileWriter import parquet.schema.MessageTypeParser import parquet.hadoop.util.ContextUtil +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.TestData import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType} import org.apache.spark.sql.{parquet, SchemaRDD} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import scala.Tuple2 // Implicits import org.apache.spark.sql.test.TestSQLContext._ case class TestRDDEntry(key: Int, value: String) -class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { +class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { + import TestData._ + TestData // Load test data tables. var testRDD: SchemaRDD = null @@ -178,23 +182,6 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { assert(true) } - test("insert (overwrite) via Scala API (new SchemaRDD)") { - val dirname = Utils.createTempDir() - val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) - .map(i => TestRDDEntry(i, s"val_$i")) - source_rdd.registerAsTable("source") - val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType)) - dest_rdd.registerAsTable("dest") - sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() - val rdd_copy1 = sql("SELECT * FROM dest").collect() - assert(rdd_copy1.size === 100) - assert(rdd_copy1(0).apply(0) === 1) - assert(rdd_copy1(0).apply(1) === "val_1") - sql("INSERT INTO dest SELECT * FROM source").collect() - val rdd_copy2 = sql("SELECT * FROM dest").collect() - assert(rdd_copy2.size === 200) - Utils.deleteRecursively(dirname) - } test("insert (appending) to same table via Scala API") { sql("INSERT INTO testsource SELECT * FROM testsource").collect() @@ -208,19 +195,5 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { Utils.deleteRecursively(ParquetTestData.testDir) ParquetTestData.writeFile() } - - /** - * Creates an empty SchemaRDD backed by a ParquetRelation. - * - * TODO: since this is so experimental it is better to have it here and not - * in SQLContext. Also note that when creating new AttributeReferences - * one needs to take care not to create duplicate Attribute ID's. - */ - private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { - val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) - new SchemaRDD( - TestSQLContext, - parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 353458432b..c0d8adf43d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -23,17 +23,21 @@ import scala.language.implicitConversions import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.util.{ArrayList => JArrayList} +import scala.reflect.runtime.universe.TypeTag + import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ @@ -77,7 +81,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. + // generates the RDD lineage for DML queries, but does not perform any execution. result.queryExecution.toRdd result } @@ -85,6 +89,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /** An alias for `hiveql`. */ def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) + /** + * Creates a table using the schema of the given class. + * + * @param tableName The name of the table to create. + * @param allowExisting When false, an exception will be thrown if the table already exists. + * @tparam A A case class that is used to describe the schema of the table to be created. + */ + def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) { + catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) + } + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. @transient protected val outputBuffer = new java.io.OutputStream { @@ -224,6 +239,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) /** Extends QueryExecution with hive specific features. */ + @DeveloperApi protected[sql] abstract class QueryExecution extends super.QueryExecution { // TODO: Create mixin for the analyzer instead of overriding things here. override lazy val optimizedPlan = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c36b5878cb..ca75cecf7d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -64,7 +64,11 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { alias)(table.getTTable, partitions.map(part => part.getTPartition)) } - def createTable(databaseName: String, tableName: String, schema: Seq[Attribute]) { + def createTable( + databaseName: String, + tableName: String, + schema: Seq[Attribute], + allowExisting: Boolean = false): Unit = { val table = new Table(databaseName, tableName) val hiveSchema = schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), "")) @@ -84,7 +88,12 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") serDeInfo.setParameters(Map[String, String]()) sd.setSerdeInfo(serDeInfo) - client.createTable(table) + + try client.createTable(table) catch { + case e: org.apache.hadoop.hive.ql.metadata.HiveException + if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] && + allowExisting => // Do nothing. + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala new file mode 100644 index 0000000000..11d8b1f0a3 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.util._ + + +/** + * *** DUPLICATED FROM sql/core. *** + * + * It is hard to have maven allow one subproject depend on another subprojects test code. + * So, we duplicate this code here. + */ +class QueryTest extends FunSuite { + /** + * Runs the plan and makes sure the answer matches the expected result. + * @param rdd the [[SchemaRDD]] to be executed + * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ]. + */ + protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = { + val convertedAnswer = expectedAnswer match { + case s: Seq[_] if s.isEmpty => s + case s: Seq[_] if s.head.isInstanceOf[Product] && + !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq) + case s: Seq[_] => s + case singleItem => Seq(Seq(singleItem)) + } + + val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty + def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer + val sparkAnswer = try rdd.collect().toSeq catch { + case e: Exception => + fail( + s""" + |Exception thrown while executing query: + |${rdd.logicalPlan} + |== Exception == + |$e + """.stripMargin) + } + + if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { + fail(s""" + |Results do not match for query: + |${rdd.logicalPlan} + |== Analyzed Plan == + |${rdd.queryExecution.analyzed} + |== Physical Plan == + |${rdd.queryExecution.executedPlan} + |== Results == + |${sideBySide( + s"== Correct Answer - ${convertedAnswer.size} ==" +: + prepareAnswer(convertedAnswer).map(_.toString), + s"== Spark Answer - ${sparkAnswer.size} ==" +: + prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} + """.stripMargin) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala new file mode 100644 index 0000000000..ad29e06905 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql.QueryTest + +/* Implicits */ +import org.apache.spark.sql.hive.TestHive._ + +case class TestData(key: Int, value: String) + +class InsertIntoHiveTableSuite extends QueryTest { + val testData = TestHive.sparkContext.parallelize( + (1 to 100).map(i => TestData(i, i.toString))) + testData.registerAsTable("testData") + + test("insertInto() HiveTable") { + createTable[TestData]("createAndInsertTest") + + // Add some data. + testData.insertInto("createAndInsertTest") + + // Make sure the table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq + ) + + // Add more data. + testData.insertInto("createAndInsertTest") + + // Make sure the table has been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq ++ testData.collect().toSeq + ) + + // Now overwrite. + testData.insertInto("createAndInsertTest", overwrite = true) + + // Make sure the registered table has also been updated. + checkAnswer( + sql("SELECT * FROM createAndInsertTest"), + testData.collect().toSeq + ) + } + + test("Double create fails when allowExisting = false") { + createTable[TestData]("doubleCreateAndInsertTest") + + intercept[org.apache.hadoop.hive.ql.metadata.HiveException] { + createTable[TestData]("doubleCreateAndInsertTest", allowExisting = false) + } + } + + test("Double create does not fail when allowExisting = true") { + createTable[TestData]("createAndInsertTest") + createTable[TestData]("createAndInsertTest") + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index aade62eb8f..843c681e0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -89,44 +89,6 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) } - test("CREATE TABLE of Parquet table") { - createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) - .registerAsTable("tmp") - val rddCopy = - hql("INSERT INTO TABLE tmp SELECT * FROM src") - .collect() - .sortBy[Int](_.apply(0) match { - case x: Int => x - case _ => 0 - }) - val rddOrig = hql("SELECT * FROM src") - .collect() - .sortBy(_.getInt(0)) - compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String")) - } - - test("Appending to Parquet table") { - createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) - .registerAsTable("tmpnew") - hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - val rddCopies = hql("SELECT * FROM tmpnew").collect() - val rddOrig = hql("SELECT * FROM src").collect() - assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number") - } - - test("Appending to and then overwriting Parquet table") { - createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) - .registerAsTable("tmp") - hql("INSERT INTO TABLE tmp SELECT * FROM src").collect() - hql("INSERT INTO TABLE tmp SELECT * FROM src").collect() - hql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect() - val rddCopies = hql("SELECT * FROM tmp").collect() - val rddOrig = hql("SELECT * FROM src").collect() - assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite") - } - private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { var counter = 0 (rddOne, rddTwo).zipped.foreach { @@ -137,18 +99,4 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft counter = counter + 1 } } - - /** - * Creates an empty SchemaRDD backed by a ParquetRelation. - * - * TODO: since this is so experimental it is better to have it here and not - * in SQLContext. Also note that when creating new AttributeReferences - * one needs to take care not to create duplicate Attribute ID's. - */ - private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { - val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) - new SchemaRDD( - TestHive, - parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) - } } -- cgit v1.2.3