aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAndre Schumacher <andre.schumacher@iki.fi>2014-04-03 15:31:47 -0700
committerReynold Xin <rxin@apache.org>2014-04-03 15:31:47 -0700
commitfbebaedf26286ee8a75065822a3af1148351f828 (patch)
treed1c02f6c81b325e52b0ac08b7f55f12b670087e8 /sql/hive
parent92a86b285f8a4af1bdf577dd4c4ea0fd5ca8d682 (diff)
downloadspark-fbebaedf26286ee8a75065822a3af1148351f828.tar.gz
spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.bz2
spark-fbebaedf26286ee8a75065822a3af1148351f828.zip
Spark parquet improvements
A few improvements to the Parquet support for SQL queries: - Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other sources - InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in HiveQL) - tests now use the new API - Parquet logging can be set to WARNING level (Default) - Default compression for Parquet files (GZIP, as in parquet-mr) Author: Andre Schumacher <andre.schumacher@iki.fi> Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits: 54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala169
5 files changed, 89 insertions, 94 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 29834a11f4..fc053c56c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -148,6 +148,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
*/
override def unregisterTable(
databaseName: Option[String], tableName: String): Unit = ???
+
+ override def unregisterAllTables() = {}
}
object HiveMetastoreTypes extends RegexParsers {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index bc3447b9d8..0a6bea0162 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -313,6 +313,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
catalog.client.dropDatabase(db, true, false, true)
}
+ catalog.unregisterAllTables()
+
FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName =>
FunctionRegistry.unregisterTemporaryUDF(udfName)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 68d45e53cd..79ec1f1cde 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -29,7 +29,7 @@ class CachedTableSuite extends HiveComparisonTest {
}
createQueryTest("read from cached table",
- "SELECT * FROM src LIMIT 1")
+ "SELECT * FROM src LIMIT 1", reset = false)
test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
@@ -40,7 +40,7 @@ class CachedTableSuite extends HiveComparisonTest {
}
createQueryTest("read from uncached table",
- "SELECT * FROM src LIMIT 1")
+ "SELECT * FROM src LIMIT 1", reset = false)
test("make sure table is uncached") {
TestHive.table("src").queryExecution.analyzed match {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c7a350ef94..18654b308d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -170,7 +170,7 @@ abstract class HiveComparisonTest
}
val installHooksCommand = "(?i)SET.*hooks".r
- def createQueryTest(testCaseName: String, sql: String) {
+ def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) {
// If test sharding is enable, skip tests that are not in the correct shard.
shardInfo.foreach {
case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return
@@ -228,7 +228,7 @@ abstract class HiveComparisonTest
try {
// MINOR HACK: You must run a query before calling reset the first time.
TestHive.sql("SHOW TABLES")
- TestHive.reset()
+ if (reset) { TestHive.reset() }
val hiveCacheFiles = queryList.zipWithIndex.map {
case (queryString, i) =>
@@ -295,7 +295,7 @@ abstract class HiveComparisonTest
fail(errorMessage)
}
}.toSeq
- TestHive.reset()
+ if (reset) { TestHive.reset() }
computedResults
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 05ad85b622..314ca48ad8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,147 +17,138 @@
package org.apache.spark.sql.parquet
-import java.io.File
-
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
import org.apache.spark.sql.hive.TestHive
+import org.apache.spark.util.Utils
+
+// Implicits
+import org.apache.spark.sql.hive.TestHive._
class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
- val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString
-
- // runs a SQL and optionally resolves one Parquet table
- def runQuery(
- querystr: String,
- tableName: Option[String] = None,
- filename: Option[String] = None): Array[Row] = {
-
- // call to resolve references in order to get CREATE TABLE AS to work
- val query = TestHive
- .parseSql(querystr)
- val finalQuery =
- if (tableName.nonEmpty && filename.nonEmpty)
- resolveParquetTable(tableName.get, filename.get, query)
- else
- query
- TestHive.executePlan(finalQuery)
- .toRdd
- .collect()
- }
- // stores a query output to a Parquet file
- def storeQuery(querystr: String, filename: String): Unit = {
- val query = WriteToFile(
- filename,
- TestHive.parseSql(querystr))
- TestHive
- .executePlan(query)
- .stringResult()
- }
+ val dirname = Utils.createTempDir()
- /**
- * TODO: This function is necessary as long as there is no notion of a Catalog for
- * Parquet tables. Once such a thing exists this functionality should be moved there.
- */
- def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = {
- TestHive.loadTestTable("src") // may not be loaded now
- plan.transform {
- case relation @ UnresolvedRelation(databaseName, name, alias) =>
- if (name == tableName)
- ParquetRelation(tableName, filename)
- else
- relation
- case op @ InsertIntoCreatedTable(databaseName, name, child) =>
- if (name == tableName) {
- // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema
- // and for that we need the child to be resolved
- val relation = ParquetRelation.create(
- filename,
- TestHive.analyzer(child),
- TestHive.sparkContext.hadoopConfiguration,
- Some(tableName))
- InsertIntoTable(
- relation.asInstanceOf[BaseRelation],
- Map.empty,
- child,
- overwrite = false)
- } else
- op
- }
- }
+ var testRDD: SchemaRDD = null
override def beforeAll() {
// write test data
- ParquetTestData.writeFile()
- // Override initial Parquet test table
- TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData)
+ ParquetTestData.writeFile
+ testRDD = parquetFile(ParquetTestData.testDir.toString)
+ testRDD.registerAsTable("testsource")
}
override def afterAll() {
- ParquetTestData.testFile.delete()
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ Utils.deleteRecursively(dirname)
+ reset() // drop all tables that were registered as part of the tests
}
+ // in case tests are failing we delete before and after each test
override def beforeEach() {
- new File(filename).getAbsoluteFile.delete()
+ Utils.deleteRecursively(dirname)
}
override def afterEach() {
- new File(filename).getAbsoluteFile.delete()
+ Utils.deleteRecursively(dirname)
}
test("SELECT on Parquet table") {
- val rdd = runQuery("SELECT * FROM parquet.testsource")
+ val rdd = sql("SELECT * FROM testsource").collect()
assert(rdd != null)
assert(rdd.forall(_.size == 6))
}
test("Simple column projection + filter on Parquet table") {
- val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true")
+ val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
assert(rdd.size === 5, "Filter returned incorrect number of rows")
assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
}
- test("Converting Hive to Parquet Table via WriteToFile") {
- storeQuery("SELECT * FROM src", filename)
- val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
- val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0))
+ test("Converting Hive to Parquet Table via saveAsParquetFile") {
+ sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
+ parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+ val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
+ val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
}
test("INSERT OVERWRITE TABLE Parquet table") {
- storeQuery("SELECT * FROM parquet.testsource", filename)
- runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
- runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
- val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
- val rddOrig = runQuery("SELECT * FROM parquet.testsource")
- compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames)
+ sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
+ parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ val rddCopy = sql("SELECT * FROM ptable").collect()
+ val rddOrig = sql("SELECT * FROM testsource").collect()
+ assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
+ compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
}
- test("CREATE TABLE AS Parquet table") {
- runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename))
- val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
+ test("CREATE TABLE of Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmp")
+ val rddCopy =
+ sql("INSERT INTO TABLE tmp SELECT * FROM src")
+ .collect()
.sortBy[Int](_.apply(0) match {
case x: Int => x
case _ => 0
})
- val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
+ val rddOrig = sql("SELECT * FROM src")
+ .collect()
+ .sortBy(_.getInt(0))
compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
}
+ test("Appending to Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmpnew")
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ val rddCopies = sql("SELECT * FROM tmpnew").collect()
+ val rddOrig = sql("SELECT * FROM src").collect()
+ assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number")
+ }
+
+ test("Appending to and then overwriting Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmp")
+ sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+ sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect()
+ val rddCopies = sql("SELECT * FROM tmp").collect()
+ val rddOrig = sql("SELECT * FROM src").collect()
+ assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite")
+ }
+
private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
var counter = 0
(rddOne, rddTwo).zipped.foreach {
(a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
- case ((value_1:Array[Byte], value_2:Array[Byte]), index) =>
- assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match")
case ((value_1, value_2), index) =>
assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
}
counter = counter + 1
}
}
+
+ /**
+ * Creates an empty SchemaRDD backed by a ParquetRelation.
+ *
+ * TODO: since this is so experimental it is better to have it here and not
+ * in SQLContext. Also note that when creating new AttributeReferences
+ * one needs to take care not to create duplicate Attribute ID's.
+ */
+ private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+ val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+ new SchemaRDD(
+ TestHive,
+ parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
+ }
}