diff options
author | Andre Schumacher <andre.schumacher@iki.fi> | 2014-04-03 15:31:47 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-04-03 15:31:47 -0700 |
commit | fbebaedf26286ee8a75065822a3af1148351f828 (patch) | |
tree | d1c02f6c81b325e52b0ac08b7f55f12b670087e8 /sql/hive/src | |
parent | 92a86b285f8a4af1bdf577dd4c4ea0fd5ca8d682 (diff) | |
download | spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.gz spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.bz2 spark-fbebaedf26286ee8a75065822a3af1148351f828.zip |
Spark parquet improvements
A few improvements to the Parquet support for SQL queries:
- Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other
sources
- InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in
HiveQL)
- tests now use the new API
- Parquet logging can be set to WARNING level (Default)
- Default compression for Parquet files (GZIP, as in parquet-mr)
Author: Andre Schumacher <andre.schumacher@iki.fi>
Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits:
54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation
Diffstat (limited to 'sql/hive/src')
5 files changed, 89 insertions, 94 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 29834a11f4..fc053c56c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -148,6 +148,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { */ override def unregisterTable( databaseName: Option[String], tableName: String): Unit = ??? + + override def unregisterAllTables() = {} } object HiveMetastoreTypes extends RegexParsers { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index bc3447b9d8..0a6bea0162 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -313,6 +313,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { catalog.client.dropDatabase(db, true, false, true) } + catalog.unregisterAllTables() + FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 68d45e53cd..79ec1f1cde 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -29,7 +29,7 @@ class CachedTableSuite extends HiveComparisonTest { } createQueryTest("read from cached table", - "SELECT * FROM src LIMIT 1") + "SELECT * FROM src LIMIT 1", reset = false) test("check that table is cached and uncache") { TestHive.table("src").queryExecution.analyzed match { @@ -40,7 +40,7 @@ class CachedTableSuite extends HiveComparisonTest { } createQueryTest("read from uncached table", - "SELECT * FROM src LIMIT 1") + "SELECT * FROM src LIMIT 1", reset = false) test("make sure table is uncached") { TestHive.table("src").queryExecution.analyzed match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index c7a350ef94..18654b308d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -170,7 +170,7 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest(testCaseName: String, sql: String) { + def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { // If test sharding is enable, skip tests that are not in the correct shard. shardInfo.foreach { case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return @@ -228,7 +228,7 @@ abstract class HiveComparisonTest try { // MINOR HACK: You must run a query before calling reset the first time. TestHive.sql("SHOW TABLES") - TestHive.reset() + if (reset) { TestHive.reset() } val hiveCacheFiles = queryList.zipWithIndex.map { case (queryString, i) => @@ -295,7 +295,7 @@ abstract class HiveComparisonTest fail(errorMessage) } }.toSeq - TestHive.reset() + if (reset) { TestHive.reset() } computedResults } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 05ad85b622..314ca48ad8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,147 +17,138 @@ package org.apache.spark.sql.parquet -import java.io.File - import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} +import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType} +import org.apache.spark.sql.{parquet, SchemaRDD} import org.apache.spark.sql.hive.TestHive +import org.apache.spark.util.Utils + +// Implicits +import org.apache.spark.sql.hive.TestHive._ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { - val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString - - // runs a SQL and optionally resolves one Parquet table - def runQuery( - querystr: String, - tableName: Option[String] = None, - filename: Option[String] = None): Array[Row] = { - - // call to resolve references in order to get CREATE TABLE AS to work - val query = TestHive - .parseSql(querystr) - val finalQuery = - if (tableName.nonEmpty && filename.nonEmpty) - resolveParquetTable(tableName.get, filename.get, query) - else - query - TestHive.executePlan(finalQuery) - .toRdd - .collect() - } - // stores a query output to a Parquet file - def storeQuery(querystr: String, filename: String): Unit = { - val query = WriteToFile( - filename, - TestHive.parseSql(querystr)) - TestHive - .executePlan(query) - .stringResult() - } + val dirname = Utils.createTempDir() - /** - * TODO: This function is necessary as long as there is no notion of a Catalog for - * Parquet tables. Once such a thing exists this functionality should be moved there. - */ - def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = { - TestHive.loadTestTable("src") // may not be loaded now - plan.transform { - case relation @ UnresolvedRelation(databaseName, name, alias) => - if (name == tableName) - ParquetRelation(tableName, filename) - else - relation - case op @ InsertIntoCreatedTable(databaseName, name, child) => - if (name == tableName) { - // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema - // and for that we need the child to be resolved - val relation = ParquetRelation.create( - filename, - TestHive.analyzer(child), - TestHive.sparkContext.hadoopConfiguration, - Some(tableName)) - InsertIntoTable( - relation.asInstanceOf[BaseRelation], - Map.empty, - child, - overwrite = false) - } else - op - } - } + var testRDD: SchemaRDD = null override def beforeAll() { // write test data - ParquetTestData.writeFile() - // Override initial Parquet test table - TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData) + ParquetTestData.writeFile + testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("testsource") } override def afterAll() { - ParquetTestData.testFile.delete() + Utils.deleteRecursively(ParquetTestData.testDir) + Utils.deleteRecursively(dirname) + reset() // drop all tables that were registered as part of the tests } + // in case tests are failing we delete before and after each test override def beforeEach() { - new File(filename).getAbsoluteFile.delete() + Utils.deleteRecursively(dirname) } override def afterEach() { - new File(filename).getAbsoluteFile.delete() + Utils.deleteRecursively(dirname) } test("SELECT on Parquet table") { - val rdd = runQuery("SELECT * FROM parquet.testsource") + val rdd = sql("SELECT * FROM testsource").collect() assert(rdd != null) assert(rdd.forall(_.size == 6)) } test("Simple column projection + filter on Parquet table") { - val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true") + val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() assert(rdd.size === 5, "Filter returned incorrect number of rows") assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value") } - test("Converting Hive to Parquet Table via WriteToFile") { - storeQuery("SELECT * FROM src", filename) - val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0)) - val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0)) + test("Converting Hive to Parquet Table via saveAsParquetFile") { + sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) + parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") + val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0)) + val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0)) compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String")) } test("INSERT OVERWRITE TABLE Parquet table") { - storeQuery("SELECT * FROM parquet.testsource", filename) - runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename)) - runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename)) - val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename)) - val rddOrig = runQuery("SELECT * FROM parquet.testsource") - compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames) + sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) + parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") + // let's do three overwrites for good measure + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + val rddCopy = sql("SELECT * FROM ptable").collect() + val rddOrig = sql("SELECT * FROM testsource").collect() + assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??") + compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) } - test("CREATE TABLE AS Parquet table") { - runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename)) - val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename)) + test("CREATE TABLE of Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmp") + val rddCopy = + sql("INSERT INTO TABLE tmp SELECT * FROM src") + .collect() .sortBy[Int](_.apply(0) match { case x: Int => x case _ => 0 }) - val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0)) + val rddOrig = sql("SELECT * FROM src") + .collect() + .sortBy(_.getInt(0)) compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String")) } + test("Appending to Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmpnew") + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + val rddCopies = sql("SELECT * FROM tmpnew").collect() + val rddOrig = sql("SELECT * FROM src").collect() + assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number") + } + + test("Appending to and then overwriting Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmp") + sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect() + val rddCopies = sql("SELECT * FROM tmp").collect() + val rddOrig = sql("SELECT * FROM src").collect() + assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite") + } + private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { var counter = 0 (rddOne, rddTwo).zipped.foreach { (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach { - case ((value_1:Array[Byte], value_2:Array[Byte]), index) => - assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match") case ((value_1, value_2), index) => assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match") } counter = counter + 1 } } + + /** + * Creates an empty SchemaRDD backed by a ParquetRelation. + * + * TODO: since this is so experimental it is better to have it here and not + * in SQLContext. Also note that when creating new AttributeReferences + * one needs to take care not to create duplicate Attribute ID's. + */ + private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { + val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) + new SchemaRDD( + TestHive, + parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) + } } |