aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-04-15 20:40:40 -0700
committerReynold Xin <rxin@apache.org>2014-04-15 20:40:40 -0700
commit273c2fd08deb49e970ec471c857dcf0b2953f922 (patch)
tree770a343087f84426800fa5278e4853d216859f47 /sql
parent63ca581d9c84176549b1ea0a1d8d7c0cca982acc (diff)
downloadspark-273c2fd08deb49e970ec471c857dcf0b2953f922.tar.gz
spark-273c2fd08deb49e970ec471c857dcf0b2953f922.tar.bz2
spark-273c2fd08deb49e970ec471c857dcf0b2953f922.zip
[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs
This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis. Author: Michael Armbrust <michael@databricks.com> Closes #354 from marmbrus/insertIntoTable and squashes the following commits: 6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests. f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable 765c506 [Michael Armbrust] Add to JavaAPI. 77b512c [Michael Armbrust] typos. 5c3ef95 [Michael Armbrust] use names for boolean args. 882afdf [Michael Armbrust] Change createTableAs to saveAsTable. Clean up api annotations. d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables. fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well. Add createTableAs function.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala57
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala148
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala41
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala77
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala77
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala52
15 files changed, 525 insertions, 156 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 24d60ea074..4d216b5cd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -20,18 +20,26 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.SparkContext
-import org.apache.spark.annotation.{AlphaComponent, Experimental}
+import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
+
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.dsl
+import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.SparkStrategies
+
+import org.apache.spark.sql.parquet.ParquetRelation
/**
* :: AlphaComponent ::
@@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new this.QueryExecution { val logical = plan }
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
- * interface is considered internal, and thus not guranteed to be stable. As a result, using
- * them directly is not reccomended.
+ * interface is considered internal, and thus not guaranteed to be stable. As a result, using
+ * them directly is not recommended.
*/
- @Experimental
+ @DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
/**
@@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
+ /**
+ * :: Experimental ::
+ * Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
+ * This registered table can be used as the target of future `insertInto` operations.
+ *
+ * {{{
+ * val sqlContext = new SQLContext(...)
+ * import sqlContext._
+ *
+ * case class Person(name: String, age: Int)
+ * createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
+ * sql("INSERT INTO people SELECT 'michael', 29")
+ * }}}
+ *
+ * @tparam A A case class type that describes the desired schema of the parquet file to be
+ * created.
+ * @param path The path where the directory containing parquet metadata should be created.
+ * Data inserted into this table will also be stored at this location.
+ * @param allowExisting When false, an exception will be thrown if this directory already exists.
+ * @param conf A Hadoop configuration object that can be used to specify options to the parquet
+ * output format.
+ *
+ * @group userf
+ */
+ @Experimental
+ def createParquetFile[A <: Product : TypeTag](
+ path: String,
+ allowExisting: Boolean = true,
+ conf: Configuration = new Configuration()): SchemaRDD = {
+ new SchemaRDD(
+ this,
+ ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
+ }
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
@@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
+ @DeveloperApi
protected abstract class QueryExecution {
def logical: LogicalPlan
@@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
- |== Optimized Logical Plan
+ |== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index a771147f90..f2ae5b0fe6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import net.razorvine.pickle.Pickler
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
-import org.apache.spark.annotation.{AlphaComponent, Experimental}
+import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@@ -83,8 +83,6 @@ import java.util.{Map => JMap}
* rdd.where('key === 1).orderBy('value.asc).select('key).collect()
* }}}
*
- * @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
- *
* @groupname Query Language Integrated Queries
* @groupdesc Query Functions that create new queries from SchemaRDDs. The
* result of all query functions is also a SchemaRDD, allowing multiple operations to be
@@ -276,8 +274,8 @@ class SchemaRDD(
* an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a
* given row, a single row will be output, with `NULL` values for each of the
* generated columns.
- * @param alias an optional alias that can be used as qualif for the attributes that are produced
- * by this generate operation.
+ * @param alias an optional alias that can be used as qualifier for the attributes that are
+ * produced by this generate operation.
*
* @group Query
*/
@@ -290,29 +288,13 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))
/**
- * :: Experimental ::
- * Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
- * no notion of persistent tables, and thus queries that contain this operator will fail to
- * optimize. When working with an extension of a SQLContext that has a persistent catalog, such
- * as a `HiveContext`, this operation will result in insertions to the table specified.
+ * Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit
+ * conversion from a standard RDD to a SchemaRDD.
*
* @group schema
*/
- @Experimental
- def insertInto(tableName: String, overwrite: Boolean = false) =
- new SchemaRDD(
- sqlContext,
- InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))
-
- /**
- * Returns this RDD as a SchemaRDD.
- * @group schema
- */
def toSchemaRDD = this
- /** FOR INTERNAL USE ONLY */
- def analyze = sqlContext.analyzer(logicalPlan)
-
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 3dd9897c0d..a390ab6005 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
/**
@@ -29,14 +31,24 @@ trait SchemaRDDLike {
private[sql] def baseSchemaRDD: SchemaRDD
/**
+ * :: DeveloperApi ::
* A lazily computed query execution workflow. All other RDD operations are passed
- * through to the RDD that is produced by this workflow.
+ * through to the RDD that is produced by this workflow. This workflow is produced lazily because
+ * invoking the whole query optimization pipeline can be expensive.
*
- * We want this to be lazy because invoking the whole query optimization pipeline can be
- * expensive.
+ * The query execution is considered a Developer API as phases may be added or removed in future
+ * releases. This execution is only exposed to provide an interface for inspecting the various
+ * phases for debugging purposes. Applications should not depend on particular phases existing
+ * or producing any specific output, even for exactly the same query.
+ *
+ * Additionally, the RDD exposed by this execution is not designed for consumption by end users.
+ * In particular, it does not contain any schema information, and it reuses Row objects
+ * internally. This object reuse improves performance, but can make programming against the RDD
+ * more difficult. Instead end users should perform RDD operations on a SchemaRDD directly.
*/
@transient
- protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
+ @DeveloperApi
+ lazy val queryExecution = sqlContext.executePlan(logicalPlan)
override def toString =
s"""${super.toString}
@@ -45,7 +57,8 @@ trait SchemaRDDLike {
/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
- * are written out using this method can be read back in as a SchemaRDD using the ``function
+ * are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
+ * function.
*
* @group schema
*/
@@ -62,4 +75,40 @@ trait SchemaRDDLike {
def registerAsTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
}
+
+ /**
+ * :: Experimental ::
+ * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+ *
+ * @group schema
+ */
+ @Experimental
+ def insertInto(tableName: String, overwrite: Boolean): Unit =
+ sqlContext.executePlan(
+ InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
+
+ /**
+ * :: Experimental ::
+ * Appends the rows from this RDD to the specified table.
+ *
+ * @group schema
+ */
+ @Experimental
+ def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the the contents of this SchemaRDD. This will fail if the table already
+ * exists.
+ *
+ * Note that this currently only works with SchemaRDDs that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ *
+ * @group schema
+ */
+ @Experimental
+ def saveAsTable(tableName: String): Unit =
+ sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 573345e42c..26922f7f33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.api.java
-import java.beans.{Introspector, PropertyDescriptor}
+import java.beans.Introspector
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
@@ -46,28 +49,41 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
}
/**
+ * :: Experimental ::
+ * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
+ * a table. This registered table can be used as the target of future insertInto` operations.
+ *
+ * {{{
+ * JavaSQLContext sqlCtx = new JavaSQLContext(...)
+ *
+ * sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
+ * sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
+ * }}}
+ *
+ * @param beanClass A java bean class object that will be used to determine the schema of the
+ * parquet file. s
+ * @param path The path where the directory containing parquet metadata should be created.
+ * Data inserted into this table will also be stored at this location.
+ * @param allowExisting When false, an exception will be thrown if this directory already exists.
+ * @param conf A Hadoop configuration object that can be used to specific options to the parquet
+ * output format.
+ */
+ @Experimental
+ def createParquetFile(
+ beanClass: Class[_],
+ path: String,
+ allowExisting: Boolean = true,
+ conf: Configuration = new Configuration()): JavaSchemaRDD = {
+ new JavaSchemaRDD(
+ sqlContext,
+ ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
+ }
+
+ /**
* Applies a schema to an RDD of Java Beans.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
- // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
- val beanInfo = Introspector.getBeanInfo(beanClass)
-
- val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
- val schema = fields.map { property =>
- val dataType = property.getPropertyType match {
- case c: Class[_] if c == classOf[java.lang.String] => StringType
- case c: Class[_] if c == java.lang.Short.TYPE => ShortType
- case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
- case c: Class[_] if c == java.lang.Long.TYPE => LongType
- case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
- case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
- case c: Class[_] if c == java.lang.Float.TYPE => FloatType
- case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
- }
-
- AttributeReference(property.getName, dataType, true)()
- }
-
+ val schema = getSchema(beanClass)
val className = beanClass.getCanonicalName
val rowRdd = rdd.rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
@@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}
+
+ /** Returns a Catalyst Schema for the given java bean class. */
+ protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
+ // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+ val beanInfo = Introspector.getBeanInfo(beanClass)
+
+ val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+ fields.map { property =>
+ val dataType = property.getPropertyType match {
+ case c: Class[_] if c == classOf[java.lang.String] => StringType
+ case c: Class[_] if c == java.lang.Short.TYPE => ShortType
+ case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
+ case c: Class[_] if c == java.lang.Long.TYPE => LongType
+ case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
+ case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
+ case c: Class[_] if c == java.lang.Float.TYPE => FloatType
+ case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
+ }
+ // TODO: Nullability could be stricter.
+ AttributeReference(property.getName, dataType, nullable = true)()
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 4d7c86a3a4..32813a66de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -119,7 +119,7 @@ private[sql] object ParquetRelation {
child,
"Attempt to create Parquet table from unresolved child (when schema is not available)")
}
- createEmpty(pathString, child.output, conf)
+ createEmpty(pathString, child.output, false, conf)
}
/**
@@ -133,8 +133,9 @@ private[sql] object ParquetRelation {
*/
def createEmpty(pathString: String,
attributes: Seq[Attribute],
+ allowExisting: Boolean,
conf: Configuration): ParquetRelation = {
- val path = checkPath(pathString, conf)
+ val path = checkPath(pathString, allowExisting, conf)
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
}
@@ -143,7 +144,7 @@ private[sql] object ParquetRelation {
new ParquetRelation(path.toString)
}
- private def checkPath(pathStr: String, conf: Configuration): Path = {
+ private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
if (pathStr == null) {
throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")
}
@@ -154,6 +155,10 @@ private[sql] object ParquetRelation {
s"Unable to create ParquetRelation: incorrectly formatted path $pathStr")
}
val path = origPath.makeQualified(fs)
+ if (!allowExisting && fs.exists(path)) {
+ sys.error(s"File $pathStr already exists.")
+ }
+
if (fs.exists(path) &&
!fs.getFileStatus(path)
.getPermission
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
new file mode 100644
index 0000000000..73d87963b3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+/* Implicits */
+import org.apache.spark.sql.test.TestSQLContext._
+
+class InsertIntoSuite extends QueryTest {
+ TestData // Initialize TestData
+ import TestData._
+
+ test("insertInto() created parquet file") {
+ val testFilePath = File.createTempFile("sparkSql", "pqt")
+ testFilePath.delete()
+ val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+ testFile.registerAsTable("createAndInsertTest")
+
+ // Add some data.
+ testData.insertInto("createAndInsertTest")
+
+ // Make sure its there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+
+ // Add more data.
+ testData.insertInto("createAndInsertTest")
+
+ // Make sure all data is there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq ++ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq ++ testData.collect().toSeq
+ )
+
+ // Now overwrite.
+ testData.insertInto("createAndInsertTest", overwrite = true)
+
+ // Make sure its there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+ }
+
+ test("INSERT INTO parquet table") {
+ val testFilePath = File.createTempFile("sparkSql", "pqt")
+ testFilePath.delete()
+ val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+ testFile.registerAsTable("createAndInsertSQLTest")
+
+ sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
+
+ // Make sure its there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertSQLTest"),
+ testData.collect().toSeq
+ )
+
+ // Append more data.
+ sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
+
+ // Make sure all data is there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq ++ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertSQLTest"),
+ testData.collect().toSeq ++ testData.collect().toSeq
+ )
+
+ sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData")
+
+ // Make sure its there for a new instance of parquet file.
+ checkAnswer(
+ parquetFile(testFilePath.getCanonicalPath),
+ testData.collect().toSeq
+ )
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertSQLTest"),
+ testData.collect().toSeq
+ )
+ }
+
+ test("Double create fails when allowExisting = false") {
+ val testFilePath = File.createTempFile("sparkSql", "pqt")
+ testFilePath.delete()
+ val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+
+ intercept[RuntimeException] {
+ createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
+ }
+ }
+
+ test("Double create does not fail when allowExisting = true") {
+ val testFilePath = File.createTempFile("sparkSql", "pqt")
+ testFilePath.delete()
+ val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+
+ createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d719ceb827..d6072b402a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -49,18 +49,21 @@ class QueryTest extends FunSuite {
|$e
""".stripMargin)
}
+
if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
fail(s"""
|Results do not match for query:
|${rdd.logicalPlan}
|== Analyzed Plan ==
|${rdd.queryExecution.analyzed}
- |== RDD ==
- |$rdd
+ |== Physical Plan ==
+ |${rdd.queryExecution.executedPlan}
|== Results ==
|${sideBySide(
- prepareAnswer(convertedAnswer).map(_.toString),
- prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ s"== Correct Answer - ${convertedAnswer.size} ==" +:
+ prepareAnswer(convertedAnswer).map(_.toString),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
""".stripMargin)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 0bb13cf442..271b1d9fca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -23,8 +23,9 @@ import org.apache.spark.sql.test._
/* Implicits */
import TestSQLContext._
+case class TestData(key: Int, value: String)
+
object TestData {
- case class TestData(key: Int, value: String)
val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString)))
testData.registerAsTable("testData")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 658ff0927a..e24c74a7a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -38,7 +38,7 @@ class PlannerSuite extends FunSuite {
}
test("count is partially aggregated") {
- val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan
+ val query = testData.groupBy('value)(Count('key)).queryExecution.analyzed
val planned = PartialAggregation(query).head
val aggregations = planned.collect { case a: Aggregate => a }
@@ -46,14 +46,14 @@ class PlannerSuite extends FunSuite {
}
test("count distinct is not partially aggregated") {
- val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan
- val planned = PartialAggregation(query.logicalPlan)
+ val query = testData.groupBy('value)(CountDistinct('key :: Nil)).queryExecution.analyzed
+ val planned = PartialAggregation(query)
assert(planned.isEmpty)
}
test("mixed aggregates are not partially aggregated") {
val query =
- testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan
+ testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).queryExecution.analyzed
val planned = PartialAggregation(query)
assert(planned.isEmpty)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fc68d6c562..d9c9b9a076 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
+import java.io.File
+
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.hadoop.fs.{Path, FileSystem}
@@ -26,21 +28,23 @@ import parquet.hadoop.ParquetFileWriter
import parquet.schema.MessageTypeParser
import parquet.hadoop.util.ContextUtil
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.TestData
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
import org.apache.spark.sql.{parquet, SchemaRDD}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import scala.Tuple2
// Implicits
import org.apache.spark.sql.test.TestSQLContext._
case class TestRDDEntry(key: Int, value: String)
-class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
+ import TestData._
+ TestData // Load test data tables.
var testRDD: SchemaRDD = null
@@ -178,23 +182,6 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
assert(true)
}
- test("insert (overwrite) via Scala API (new SchemaRDD)") {
- val dirname = Utils.createTempDir()
- val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
- .map(i => TestRDDEntry(i, s"val_$i"))
- source_rdd.registerAsTable("source")
- val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType))
- dest_rdd.registerAsTable("dest")
- sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
- val rdd_copy1 = sql("SELECT * FROM dest").collect()
- assert(rdd_copy1.size === 100)
- assert(rdd_copy1(0).apply(0) === 1)
- assert(rdd_copy1(0).apply(1) === "val_1")
- sql("INSERT INTO dest SELECT * FROM source").collect()
- val rdd_copy2 = sql("SELECT * FROM dest").collect()
- assert(rdd_copy2.size === 200)
- Utils.deleteRecursively(dirname)
- }
test("insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource").collect()
@@ -208,19 +195,5 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
}
-
- /**
- * Creates an empty SchemaRDD backed by a ParquetRelation.
- *
- * TODO: since this is so experimental it is better to have it here and not
- * in SQLContext. Also note that when creating new AttributeReferences
- * one needs to take care not to create duplicate Attribute ID's.
- */
- private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
- val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
- new SchemaRDD(
- TestSQLContext,
- parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 353458432b..c0d8adf43d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -23,17 +23,21 @@ import scala.language.implicitConversions
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.util.{ArrayList => JArrayList}
+import scala.reflect.runtime.universe.TypeTag
+
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
+import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
@@ -77,7 +81,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
- // generates the RDD lineage for DML queries, but do not perform any execution.
+ // generates the RDD lineage for DML queries, but does not perform any execution.
result.queryExecution.toRdd
result
}
@@ -85,6 +89,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/** An alias for `hiveql`. */
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
+ /**
+ * Creates a table using the schema of the given class.
+ *
+ * @param tableName The name of the table to create.
+ * @param allowExisting When false, an exception will be thrown if the table already exists.
+ * @tparam A A case class that is used to describe the schema of the table to be created.
+ */
+ def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
+ catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
+ }
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
@transient
protected val outputBuffer = new java.io.OutputStream {
@@ -224,6 +239,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
/** Extends QueryExecution with hive specific features. */
+ @DeveloperApi
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c36b5878cb..ca75cecf7d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -64,7 +64,11 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
alias)(table.getTTable, partitions.map(part => part.getTPartition))
}
- def createTable(databaseName: String, tableName: String, schema: Seq[Attribute]) {
+ def createTable(
+ databaseName: String,
+ tableName: String,
+ schema: Seq[Attribute],
+ allowExisting: Boolean = false): Unit = {
val table = new Table(databaseName, tableName)
val hiveSchema =
schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
@@ -84,7 +88,12 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
serDeInfo.setParameters(Map[String, String]())
sd.setSerdeInfo(serDeInfo)
- client.createTable(table)
+
+ try client.createTable(table) catch {
+ case e: org.apache.hadoop.hive.ql.metadata.HiveException
+ if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
+ allowExisting => // Do nothing.
+ }
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
new file mode 100644
index 0000000000..11d8b1f0a3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.util._
+
+
+/**
+ * *** DUPLICATED FROM sql/core. ***
+ *
+ * It is hard to have maven allow one subproject depend on another subprojects test code.
+ * So, we duplicate this code here.
+ */
+class QueryTest extends FunSuite {
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ * @param rdd the [[SchemaRDD]] to be executed
+ * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+ */
+ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
+ val convertedAnswer = expectedAnswer match {
+ case s: Seq[_] if s.isEmpty => s
+ case s: Seq[_] if s.head.isInstanceOf[Product] &&
+ !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq)
+ case s: Seq[_] => s
+ case singleItem => Seq(Seq(singleItem))
+ }
+
+ val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty
+ def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer
+ val sparkAnswer = try rdd.collect().toSeq catch {
+ case e: Exception =>
+ fail(
+ s"""
+ |Exception thrown while executing query:
+ |${rdd.logicalPlan}
+ |== Exception ==
+ |$e
+ """.stripMargin)
+ }
+
+ if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
+ fail(s"""
+ |Results do not match for query:
+ |${rdd.logicalPlan}
+ |== Analyzed Plan ==
+ |${rdd.queryExecution.analyzed}
+ |== Physical Plan ==
+ |${rdd.queryExecution.executedPlan}
+ |== Results ==
+ |${sideBySide(
+ s"== Correct Answer - ${convertedAnswer.size} ==" +:
+ prepareAnswer(convertedAnswer).map(_.toString),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ """.stripMargin)
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
new file mode 100644
index 0000000000..ad29e06905
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.QueryTest
+
+/* Implicits */
+import org.apache.spark.sql.hive.TestHive._
+
+case class TestData(key: Int, value: String)
+
+class InsertIntoHiveTableSuite extends QueryTest {
+ val testData = TestHive.sparkContext.parallelize(
+ (1 to 100).map(i => TestData(i, i.toString)))
+ testData.registerAsTable("testData")
+
+ test("insertInto() HiveTable") {
+ createTable[TestData]("createAndInsertTest")
+
+ // Add some data.
+ testData.insertInto("createAndInsertTest")
+
+ // Make sure the table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+
+ // Add more data.
+ testData.insertInto("createAndInsertTest")
+
+ // Make sure the table has been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq ++ testData.collect().toSeq
+ )
+
+ // Now overwrite.
+ testData.insertInto("createAndInsertTest", overwrite = true)
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+ }
+
+ test("Double create fails when allowExisting = false") {
+ createTable[TestData]("doubleCreateAndInsertTest")
+
+ intercept[org.apache.hadoop.hive.ql.metadata.HiveException] {
+ createTable[TestData]("doubleCreateAndInsertTest", allowExisting = false)
+ }
+ }
+
+ test("Double create does not fail when allowExisting = true") {
+ createTable[TestData]("createAndInsertTest")
+ createTable[TestData]("createAndInsertTest")
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index aade62eb8f..843c681e0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -89,44 +89,6 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
}
- test("CREATE TABLE of Parquet table") {
- createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
- .registerAsTable("tmp")
- val rddCopy =
- hql("INSERT INTO TABLE tmp SELECT * FROM src")
- .collect()
- .sortBy[Int](_.apply(0) match {
- case x: Int => x
- case _ => 0
- })
- val rddOrig = hql("SELECT * FROM src")
- .collect()
- .sortBy(_.getInt(0))
- compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
- }
-
- test("Appending to Parquet table") {
- createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
- .registerAsTable("tmpnew")
- hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
- hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
- hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
- val rddCopies = hql("SELECT * FROM tmpnew").collect()
- val rddOrig = hql("SELECT * FROM src").collect()
- assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number")
- }
-
- test("Appending to and then overwriting Parquet table") {
- createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
- .registerAsTable("tmp")
- hql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
- hql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
- hql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect()
- val rddCopies = hql("SELECT * FROM tmp").collect()
- val rddOrig = hql("SELECT * FROM src").collect()
- assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite")
- }
-
private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
var counter = 0
(rddOne, rddTwo).zipped.foreach {
@@ -137,18 +99,4 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
counter = counter + 1
}
}
-
- /**
- * Creates an empty SchemaRDD backed by a ParquetRelation.
- *
- * TODO: since this is so experimental it is better to have it here and not
- * in SQLContext. Also note that when creating new AttributeReferences
- * one needs to take care not to create duplicate Attribute ID's.
- */
- private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
- val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
- new SchemaRDD(
- TestHive,
- parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
- }
}