aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorAndre Schumacher <andre.schumacher@iki.fi>2014-04-03 15:31:47 -0700
committerReynold Xin <rxin@apache.org>2014-04-03 15:31:47 -0700
commitfbebaedf26286ee8a75065822a3af1148351f828 (patch)
treed1c02f6c81b325e52b0ac08b7f55f12b670087e8 /sql/core/src/test
parent92a86b285f8a4af1bdf577dd4c4ea0fd5ca8d682 (diff)
downloadspark-fbebaedf26286ee8a75065822a3af1148351f828.tar.gz
spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.bz2
spark-fbebaedf26286ee8a75065822a3af1148351f828.zip
Spark parquet improvements
A few improvements to the Parquet support for SQL queries: - Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other sources - InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in HiveQL) - tests now use the new API - Parquet logging can be set to WARNING level (Default) - Default compression for Parquet files (GZIP, as in parquet-mr) Author: Andre Schumacher <andre.schumacher@iki.fi> Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits: 54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/resources/log4j.properties8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala118
2 files changed, 103 insertions, 23 deletions
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 7bb6789bd3..dffd15a618 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -45,8 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
-# Parquet logging
-parquet.hadoop.InternalParquetRecordReader=WARN
-log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN
-parquet.hadoop.ParquetInputFormat=WARN
-log4j.logger.parquet.hadoop.ParquetInputFormat=WARN
+# Parquet related logging
+log4j.logger.parquet.hadoop=WARN
+log4j.logger.org.apache.spark.sql.parquet=INFO
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index ea1733b361..a62a3c4d02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -19,27 +19,40 @@ package org.apache.spark.sql.parquet
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.mapreduce.Job
+
import parquet.hadoop.ParquetFileWriter
-import parquet.hadoop.util.ContextUtil
import parquet.schema.MessageTypeParser
+import parquet.hadoop.util.ContextUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import scala.Tuple2
// Implicits
import org.apache.spark.sql.test.TestSQLContext._
+case class TestRDDEntry(key: Int, value: String)
+
class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+
+ var testRDD: SchemaRDD = null
+
override def beforeAll() {
ParquetTestData.writeFile()
+ testRDD = parquetFile(ParquetTestData.testDir.toString)
+ testRDD.registerAsTable("testsource")
}
override def afterAll() {
- ParquetTestData.testFile.delete()
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ // here we should also unregister the table??
}
test("self-join parquet files") {
@@ -55,11 +68,18 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
case Seq(_, _) => // All good
}
- // TODO: We can't run this query as it NPEs
+ val result = query.collect()
+ assert(result.size === 9, "self-join result has incorrect size")
+ assert(result(0).size === 12, "result row has incorrect size")
+ result.zipWithIndex.foreach {
+ case (row, index) => row.zipWithIndex.foreach {
+ case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column")
+ }
+ }
}
test("Import of simple Parquet file") {
- val result = getRDD(ParquetTestData.testData).collect()
+ val result = parquetFile(ParquetTestData.testDir.toString).collect()
assert(result.size === 15)
result.zipWithIndex.foreach {
case (row, index) => {
@@ -125,20 +145,82 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
fs.delete(path, true)
}
+ test("Creating case class RDD table") {
+ TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ .registerAsTable("tmp")
+ val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
+ var counter = 1
+ rdd.foreach {
+ // '===' does not like string comparison?
+ row: Row => {
+ assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
+ counter = counter + 1
+ }
+ }
+ }
+
+ test("Saving case class RDD table to file and reading it back in") {
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ rdd.saveAsParquetFile(path)
+ val readFile = parquetFile(path)
+ readFile.registerAsTable("tmpx")
+ val rdd_copy = sql("SELECT * FROM tmpx").collect()
+ val rdd_orig = rdd.collect()
+ for(i <- 0 to 99) {
+ assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
+ assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
+ }
+ Utils.deleteRecursively(file)
+ assert(true)
+ }
+
+ test("insert (overwrite) via Scala API (new SchemaRDD)") {
+ val dirname = Utils.createTempDir()
+ val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ source_rdd.registerAsTable("source")
+ val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType))
+ dest_rdd.registerAsTable("dest")
+ sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
+ val rdd_copy1 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy1.size === 100)
+ assert(rdd_copy1(0).apply(0) === 1)
+ assert(rdd_copy1(0).apply(1) === "val_1")
+ sql("INSERT INTO dest SELECT * FROM source").collect()
+ val rdd_copy2 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy2.size === 200)
+ Utils.deleteRecursively(dirname)
+ }
+
+ test("insert (appending) to same table via Scala API") {
+ sql("INSERT INTO testsource SELECT * FROM testsource").collect()
+ val double_rdd = sql("SELECT * FROM testsource").collect()
+ assert(double_rdd != null)
+ assert(double_rdd.size === 30)
+ for(i <- (0 to 14)) {
+ assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
+ }
+ // let's restore the original test data
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ ParquetTestData.writeFile()
+ }
+
/**
- * Computes the given [[ParquetRelation]] and returns its RDD.
+ * Creates an empty SchemaRDD backed by a ParquetRelation.
*
- * @param parquetRelation The Parquet relation.
- * @return An RDD of Rows.
+ * TODO: since this is so experimental it is better to have it here and not
+ * in SQLContext. Also note that when creating new AttributeReferences
+ * one needs to take care not to create duplicate Attribute ID's.
*/
- private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = {
- val scanner = new ParquetTableScan(
- parquetRelation.output,
- parquetRelation,
- None)(TestSQLContext.sparkContext)
- scanner
- .execute
- .map(_.copy())
+ private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+ val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+ new SchemaRDD(
+ TestSQLContext,
+ parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
}
}