diff options
author | Andre Schumacher <andre.schumacher@iki.fi> | 2014-04-03 15:31:47 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-04-03 15:31:47 -0700 |
commit | fbebaedf26286ee8a75065822a3af1148351f828 (patch) | |
tree | d1c02f6c81b325e52b0ac08b7f55f12b670087e8 /sql/core/src/test | |
parent | 92a86b285f8a4af1bdf577dd4c4ea0fd5ca8d682 (diff) | |
download | spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.gz spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.bz2 spark-fbebaedf26286ee8a75065822a3af1148351f828.zip |
Spark parquet improvements
A few improvements to the Parquet support for SQL queries:
- Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other
sources
- InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in
HiveQL)
- tests now use the new API
- Parquet logging can be set to WARNING level (Default)
- Default compression for Parquet files (GZIP, as in parquet-mr)
Author: Andre Schumacher <andre.schumacher@iki.fi>
Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits:
54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/resources/log4j.properties | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 118 |
2 files changed, 103 insertions, 23 deletions
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index 7bb6789bd3..dffd15a618 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -45,8 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF -# Parquet logging -parquet.hadoop.InternalParquetRecordReader=WARN -log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN -parquet.hadoop.ParquetInputFormat=WARN -log4j.logger.parquet.hadoop.ParquetInputFormat=WARN +# Parquet related logging +log4j.logger.parquet.hadoop=WARN +log4j.logger.org.apache.spark.sql.parquet=INFO diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index ea1733b361..a62a3c4d02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -19,27 +19,40 @@ package org.apache.spark.sql.parquet import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.mapreduce.Job + import parquet.hadoop.ParquetFileWriter -import parquet.hadoop.util.ContextUtil import parquet.schema.MessageTypeParser +import parquet.hadoop.util.ContextUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType} +import org.apache.spark.sql.{parquet, SchemaRDD} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import scala.Tuple2 // Implicits import org.apache.spark.sql.test.TestSQLContext._ +case class TestRDDEntry(key: Int, value: String) + class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { + + var testRDD: SchemaRDD = null + override def beforeAll() { ParquetTestData.writeFile() + testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("testsource") } override def afterAll() { - ParquetTestData.testFile.delete() + Utils.deleteRecursively(ParquetTestData.testDir) + // here we should also unregister the table?? } test("self-join parquet files") { @@ -55,11 +68,18 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { case Seq(_, _) => // All good } - // TODO: We can't run this query as it NPEs + val result = query.collect() + assert(result.size === 9, "self-join result has incorrect size") + assert(result(0).size === 12, "result row has incorrect size") + result.zipWithIndex.foreach { + case (row, index) => row.zipWithIndex.foreach { + case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column") + } + } } test("Import of simple Parquet file") { - val result = getRDD(ParquetTestData.testData).collect() + val result = parquetFile(ParquetTestData.testDir.toString).collect() assert(result.size === 15) result.zipWithIndex.foreach { case (row, index) => { @@ -125,20 +145,82 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { fs.delete(path, true) } + test("Creating case class RDD table") { + TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + .registerAsTable("tmp") + val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0)) + var counter = 1 + rdd.foreach { + // '===' does not like string comparison? + row: Row => { + assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + counter = counter + 1 + } + } + } + + test("Saving case class RDD table to file and reading it back in") { + val file = getTempFilePath("parquet") + val path = file.toString + val rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.saveAsParquetFile(path) + val readFile = parquetFile(path) + readFile.registerAsTable("tmpx") + val rdd_copy = sql("SELECT * FROM tmpx").collect() + val rdd_orig = rdd.collect() + for(i <- 0 to 99) { + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") + } + Utils.deleteRecursively(file) + assert(true) + } + + test("insert (overwrite) via Scala API (new SchemaRDD)") { + val dirname = Utils.createTempDir() + val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + source_rdd.registerAsTable("source") + val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType)) + dest_rdd.registerAsTable("dest") + sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() + val rdd_copy1 = sql("SELECT * FROM dest").collect() + assert(rdd_copy1.size === 100) + assert(rdd_copy1(0).apply(0) === 1) + assert(rdd_copy1(0).apply(1) === "val_1") + sql("INSERT INTO dest SELECT * FROM source").collect() + val rdd_copy2 = sql("SELECT * FROM dest").collect() + assert(rdd_copy2.size === 200) + Utils.deleteRecursively(dirname) + } + + test("insert (appending) to same table via Scala API") { + sql("INSERT INTO testsource SELECT * FROM testsource").collect() + val double_rdd = sql("SELECT * FROM testsource").collect() + assert(double_rdd != null) + assert(double_rdd.size === 30) + for(i <- (0 to 14)) { + assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match") + } + // let's restore the original test data + Utils.deleteRecursively(ParquetTestData.testDir) + ParquetTestData.writeFile() + } + /** - * Computes the given [[ParquetRelation]] and returns its RDD. + * Creates an empty SchemaRDD backed by a ParquetRelation. * - * @param parquetRelation The Parquet relation. - * @return An RDD of Rows. + * TODO: since this is so experimental it is better to have it here and not + * in SQLContext. Also note that when creating new AttributeReferences + * one needs to take care not to create duplicate Attribute ID's. */ - private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = { - val scanner = new ParquetTableScan( - parquetRelation.output, - parquetRelation, - None)(TestSQLContext.sparkContext) - scanner - .execute - .map(_.copy()) + private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { + val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) + new SchemaRDD( + TestSQLContext, + parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) } } |