aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndre Schumacher <andre.schumacher@iki.fi>2014-04-03 15:31:47 -0700
committerReynold Xin <rxin@apache.org>2014-04-03 15:31:47 -0700
commitfbebaedf26286ee8a75065822a3af1148351f828 (patch)
treed1c02f6c81b325e52b0ac08b7f55f12b670087e8 /sql
parent92a86b285f8a4af1bdf577dd4c4ea0fd5ca8d682 (diff)
downloadspark-fbebaedf26286ee8a75065822a3af1148351f828.tar.gz
spark-fbebaedf26286ee8a75065822a3af1148351f828.tar.bz2
spark-fbebaedf26286ee8a75065822a3af1148351f828.zip
Spark parquet improvements
A few improvements to the Parquet support for SQL queries: - Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other sources - InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in HiveQL) - tests now use the new API - Parquet logging can be set to WARNING level (Default) - Default compression for Parquet files (GZIP, as in parquet-mr) Author: Andre Schumacher <andre.schumacher@iki.fi> Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits: 54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala129
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala139
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala10
-rw-r--r--sql/core/src/test/resources/log4j.properties8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala118
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala169
15 files changed, 460 insertions, 212 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 8de87594c8..4ea80fee23 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -106,6 +106,8 @@ class SqlParser extends StandardTokenParsers {
protected val IF = Keyword("IF")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
+ protected val INSERT = Keyword("INSERT")
+ protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
@@ -114,6 +116,7 @@ class SqlParser extends StandardTokenParsers {
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
+ protected val OVERWRITE = Keyword("OVERWRITE")
protected val LIKE = Keyword("LIKE")
protected val RLIKE = Keyword("RLIKE")
protected val REGEXP = Keyword("REGEXP")
@@ -162,7 +165,7 @@ class SqlParser extends StandardTokenParsers {
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
- )
+ ) | insert
protected lazy val select: Parser[LogicalPlan] =
SELECT ~> opt(DISTINCT) ~ projections ~
@@ -185,6 +188,13 @@ class SqlParser extends StandardTokenParsers {
withLimit
}
+ protected lazy val insert: Parser[LogicalPlan] =
+ INSERT ~> opt(OVERWRITE) ~ inTo ~ select <~ opt(";") ^^ {
+ case o ~ r ~ s =>
+ val overwrite: Boolean = o.getOrElse("") == "OVERWRITE"
+ InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
+ }
+
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
protected lazy val projection: Parser[Expression] =
@@ -195,6 +205,8 @@ class SqlParser extends StandardTokenParsers {
protected lazy val from: Parser[LogicalPlan] = FROM ~> relations
+ protected lazy val inTo: Parser[LogicalPlan] = INTO ~> relation
+
// Based very loosely on the MySQL Grammar.
// http://dev.mysql.com/doc/refman/5.0/en/join.html
protected lazy val relations: Parser[LogicalPlan] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 6b58b9322c..f30b5d8167 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -31,19 +31,33 @@ trait Catalog {
alias: Option[String] = None): LogicalPlan
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+
def unregisterTable(databaseName: Option[String], tableName: String): Unit
+
+ def unregisterAllTables(): Unit
}
class SimpleCatalog extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
- def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
+ override def registerTable(
+ databaseName: Option[String],
+ tableName: String,
+ plan: LogicalPlan): Unit = {
tables += ((tableName, plan))
}
- def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }
+ override def unregisterTable(
+ databaseName: Option[String],
+ tableName: String) = {
+ tables -= tableName
+ }
+
+ override def unregisterAllTables() = {
+ tables.clear()
+ }
- def lookupRelation(
+ override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {
@@ -92,6 +106,10 @@ trait OverrideCatalog extends Catalog {
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
overrides.remove((databaseName, tableName))
}
+
+ override def unregisterAllTables(): Unit = {
+ overrides.clear()
+ }
}
/**
@@ -113,4 +131,6 @@ object EmptyCatalog extends Catalog {
def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
throw new UnsupportedOperationException
}
+
+ override def unregisterAllTables(): Unit = {}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f4bf00f4cf..36059c6630 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -80,12 +80,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
/**
- * Loads a parequet file, returning the result as a [[SchemaRDD]].
+ * Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
- new SchemaRDD(this, parquet.ParquetRelation("ParquetFile", path))
+ new SchemaRDD(this, parquet.ParquetRelation(path))
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b3e51fdf75..fe8bd5a508 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -171,10 +171,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
val relation =
- ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
- InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
+ ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
+ InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
- InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil
+ InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
// TODO: Should be pushing down filters as well.
pruneFilterProject(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 4ab755c096..114bfbb719 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -17,30 +17,29 @@
package org.apache.spark.sql.parquet
-import java.io.{IOException, FileNotFoundException}
-
-import scala.collection.JavaConversions._
+import java.io.IOException
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapreduce.Job
-import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader}
+import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import parquet.io.api.{Binary, RecordConsumer}
+import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser}
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition
-import parquet.schema.{MessageType, MessageTypeParser}
-import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
-import parquet.schema.{Type => ParquetType}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
import org.apache.spark.sql.catalyst.types._
+// Implicits
+import scala.collection.JavaConversions._
+
/**
* Relation that consists of data stored in a Parquet columnar format.
*
@@ -48,14 +47,14 @@ import org.apache.spark.sql.catalyst.types._
* of using this class directly.
*
* {{{
- * val parquetRDD = sqlContext.parquetFile("path/to/parequet.file")
+ * val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
* }}}
*
- * @param tableName The name of the relation that can be used in queries.
* @param path The path to the Parquet file.
*/
-case class ParquetRelation(tableName: String, path: String)
- extends BaseRelation with MultiInstanceRelation {
+private[sql] case class ParquetRelation(val path: String)
+ extends LeafNode with MultiInstanceRelation {
+ self: Product =>
/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
@@ -65,33 +64,59 @@ case class ParquetRelation(tableName: String, path: String)
.getSchema
/** Attributes */
- val attributes =
+ override val output =
ParquetTypesConverter
- .convertToAttributes(parquetSchema)
+ .convertToAttributes(parquetSchema)
- /** Output */
- override val output = attributes
-
- // Parquet files have no concepts of keys, therefore no Partitioner
- // Note: we could allow Block level access; needs to be thought through
- override def isPartitioned = false
-
- override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
+ override def newInstance = ParquetRelation(path).asInstanceOf[this.type]
// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
override def equals(other: Any) = other match {
case p: ParquetRelation =>
- p.tableName == tableName && p.path == path && p.output == output
+ p.path == path && p.output == output
case _ => false
}
}
-object ParquetRelation {
+private[sql] object ParquetRelation {
+
+ def enableLogForwarding() {
+ // Note: Parquet does not use forwarding to parent loggers which
+ // is required for the JUL-SLF4J bridge to work. Also there is
+ // a default logger that appends to Console which needs to be
+ // reset.
+ import org.slf4j.bridge.SLF4JBridgeHandler
+ import java.util.logging.Logger
+ import java.util.logging.LogManager
+
+ val loggerNames = Seq(
+ "parquet.hadoop.ColumnChunkPageWriteStore",
+ "parquet.hadoop.InternalParquetRecordWriter",
+ "parquet.hadoop.ParquetRecordReader",
+ "parquet.hadoop.ParquetInputFormat",
+ "parquet.hadoop.ParquetOutputFormat",
+ "parquet.hadoop.ParquetFileReader",
+ "parquet.hadoop.InternalParquetRecordReader",
+ "parquet.hadoop.codec.CodecConfig")
+ LogManager.getLogManager.reset()
+ SLF4JBridgeHandler.install()
+ for(name <- loggerNames) {
+ val logger = Logger.getLogger(name)
+ logger.setParent(Logger.getGlobal)
+ logger.setUseParentHandlers(true)
+ }
+ }
// The element type for the RDDs that this relation maps to.
type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+ // The compression type
+ type CompressionType = parquet.hadoop.metadata.CompressionCodecName
+
+ // The default compression
+ val defaultCompression = CompressionCodecName.GZIP
+
/**
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
* this is used inside [[org.apache.spark.sql.execution.SparkStrategies SparkStrategies]] to
@@ -100,24 +125,39 @@ object ParquetRelation {
*
* @param pathString The directory the Parquetfile will be stored in.
* @param child The child node that will be used for extracting the schema.
- * @param conf A configuration configuration to be used.
- * @param tableName The name of the resulting relation.
- * @return An empty ParquetRelation inferred metadata.
+ * @param conf A configuration to be used.
+ * @return An empty ParquetRelation with inferred metadata.
*/
def create(pathString: String,
child: LogicalPlan,
- conf: Configuration,
- tableName: Option[String]): ParquetRelation = {
+ conf: Configuration): ParquetRelation = {
if (!child.resolved) {
throw new UnresolvedException[LogicalPlan](
child,
"Attempt to create Parquet table from unresolved child (when schema is not available)")
}
+ createEmpty(pathString, child.output, conf)
+ }
- val name = s"${tableName.getOrElse(child.nodeName)}_parquet"
+ /**
+ * Creates an empty ParquetRelation and underlying Parquetfile that only
+ * consists of the Metadata for the given schema.
+ *
+ * @param pathString The directory the Parquetfile will be stored in.
+ * @param attributes The schema of the relation.
+ * @param conf A configuration to be used.
+ * @return An empty ParquetRelation.
+ */
+ def createEmpty(pathString: String,
+ attributes: Seq[Attribute],
+ conf: Configuration): ParquetRelation = {
val path = checkPath(pathString, conf)
- ParquetTypesConverter.writeMetaData(child.output, path, conf)
- new ParquetRelation(name, path.toString)
+ if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
+ conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
+ }
+ ParquetRelation.enableLogForwarding()
+ ParquetTypesConverter.writeMetaData(attributes, path, conf)
+ new ParquetRelation(path.toString)
}
private def checkPath(pathStr: String, conf: Configuration): Path = {
@@ -143,7 +183,7 @@ object ParquetRelation {
}
}
-object ParquetTypesConverter {
+private[parquet] object ParquetTypesConverter {
def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
// for now map binary to string type
// TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
@@ -242,6 +282,7 @@ object ParquetTypesConverter {
extraMetadata,
"Spark")
+ ParquetRelation.enableLogForwarding()
ParquetFileWriter.writeMetadataFile(
conf,
path,
@@ -268,16 +309,24 @@ object ParquetTypesConverter {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
}
val path = origPath.makeQualified(fs)
+ if (!fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ s"Expected $path for be a directory with Parquet files/metadata")
+ }
+ ParquetRelation.enableLogForwarding()
val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+ // if this is a new table that was just created we will find only the metadata file
if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
- // TODO: improve exception handling, etc.
ParquetFileReader.readFooter(conf, metadataPath)
} else {
- if (!fs.exists(path) || !fs.isFile(path)) {
- throw new FileNotFoundException(
- s"Could not find file ${path.toString} when trying to read metadata")
+ // there may be one or more Parquet files in the given directory
+ val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
+ // TODO: for now we assume that all footers (if there is more than one) have identical
+ // metadata; we may want to add a check here at some point
+ if (footers.size() == 0) {
+ throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
}
- ParquetFileReader.readFooter(conf, path)
+ footers(0).getParquetMetadata
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 7285f5b88b..d5846baa72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -24,26 +24,29 @@ import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
-import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}
+import parquet.hadoop.util.ContextUtil
import parquet.io.InvalidRecordException
import parquet.schema.MessageType
+import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
-import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
/**
* Parquet table scan operator. Imports the file that backs the given
* [[ParquetRelation]] as a RDD[Row].
*/
case class ParquetTableScan(
- @transient output: Seq[Attribute],
- @transient relation: ParquetRelation,
- @transient columnPruningPred: Option[Expression])(
+ // note: output cannot be transient, see
+ // https://issues.apache.org/jira/browse/SPARK-1367
+ output: Seq[Attribute],
+ relation: ParquetRelation,
+ columnPruningPred: Option[Expression])(
@transient val sc: SparkContext)
extends LeafNode {
@@ -53,6 +56,12 @@ case class ParquetTableScan(
job,
classOf[org.apache.spark.sql.parquet.RowReadSupport])
val conf: Configuration = ContextUtil.getConfiguration(job)
+ val fileList = FileSystemHelper.listFiles(relation.path, conf)
+ // add all paths in the directory but skip "hidden" ones such
+ // as "_SUCCESS" and "_metadata"
+ for (path <- fileList if !path.getName.startsWith("_")) {
+ NewFileInputFormat.addInputPath(job, path)
+ }
conf.set(
RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertFromAttributes(output).toString)
@@ -63,14 +72,12 @@ case class ParquetTableScan(
``FilteredRecordReader`` (via Configuration, for example). Simple
filter-rows-by-column-values however should be supported.
*/
- sc.newAPIHadoopFile(
- relation.path,
- classOf[ParquetInputFormat[Row]],
- classOf[Void], classOf[Row],
- conf)
+ sc.newAPIHadoopRDD(conf, classOf[ParquetInputFormat[Row]], classOf[Void], classOf[Row])
.map(_._2)
}
+ override def otherCopyArgs = sc :: Nil
+
/**
* Applies a (candidate) projection.
*
@@ -108,15 +115,31 @@ case class ParquetTableScan(
}
}
+/**
+ * Operator that acts as a sink for queries on RDDs and can be used to
+ * store the output inside a directory of Parquet files. This operator
+ * is similar to Hive's INSERT INTO TABLE operation in the sense that
+ * one can choose to either overwrite or append to a directory. Note
+ * that consecutive insertions to the same table must have compatible
+ * (source) schemas.
+ *
+ * WARNING: EXPERIMENTAL! InsertIntoParquetTable with overwrite=false may
+ * cause data corruption in the case that multiple users try to append to
+ * the same table simultaneously. Inserting into a table that was
+ * previously generated by other means (e.g., by creating an HDFS
+ * directory and importing Parquet files generated by other tools) may
+ * cause unpredicted behaviour and therefore results in a RuntimeException
+ * (only detected via filename pattern so will not catch all cases).
+ */
case class InsertIntoParquetTable(
- @transient relation: ParquetRelation,
- @transient child: SparkPlan)(
+ relation: ParquetRelation,
+ child: SparkPlan,
+ overwrite: Boolean = false)(
@transient val sc: SparkContext)
extends UnaryNode with SparkHadoopMapReduceUtil {
/**
- * Inserts all the rows in the Parquet file. Note that OVERWRITE is implicit, since
- * Parquet files are write-once.
+ * Inserts all rows into the Parquet file.
*/
override def execute() = {
// TODO: currently we do not check whether the "schema"s are compatible
@@ -135,19 +158,21 @@ case class InsertIntoParquetTable(
classOf[org.apache.spark.sql.parquet.RowWriteSupport])
// TODO: move that to function in object
- val conf = job.getConfiguration
+ val conf = ContextUtil.getConfiguration(job)
conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, relation.parquetSchema.toString)
val fspath = new Path(relation.path)
val fs = fspath.getFileSystem(conf)
- try {
- fs.delete(fspath, true)
- } catch {
- case e: IOException =>
- throw new IOException(
- s"Unable to clear output directory ${fspath.toString} prior"
- + s" to InsertIntoParquetTable:\n${e.toString}")
+ if (overwrite) {
+ try {
+ fs.delete(fspath, true)
+ } catch {
+ case e: IOException =>
+ throw new IOException(
+ s"Unable to clear output directory ${fspath.toString} prior"
+ + s" to InsertIntoParquetTable:\n${e.toString}")
+ }
}
saveAsHadoopFile(childRdd, relation.path.toString, conf)
@@ -157,6 +182,8 @@ case class InsertIntoParquetTable(
override def output = child.output
+ override def otherCopyArgs = sc :: Nil
+
// based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]]
// TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2?
// .. then we could use the default one and could use [[MutablePair]]
@@ -167,15 +194,21 @@ case class InsertIntoParquetTable(
conf: Configuration) {
val job = new Job(conf)
val keyType = classOf[Void]
- val outputFormatType = classOf[parquet.hadoop.ParquetOutputFormat[Row]]
job.setOutputKeyClass(keyType)
job.setOutputValueClass(classOf[Row])
- val wrappedConf = new SerializableWritable(job.getConfiguration)
NewFileOutputFormat.setOutputPath(job, new Path(path))
+ val wrappedConf = new SerializableWritable(job.getConfiguration)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = sc.newRddId()
+ val taskIdOffset =
+ if (overwrite) 1
+ else {
+ FileSystemHelper
+ .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
+ }
+
def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
@@ -184,7 +217,7 @@ case class InsertIntoParquetTable(
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
- val format = outputFormatType.newInstance
+ val format = new AppendingParquetOutputFormat(taskIdOffset)
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext)
@@ -196,7 +229,7 @@ case class InsertIntoParquetTable(
committer.commitTask(hadoopContext)
return 1
}
- val jobFormat = outputFormatType.newInstance
+ val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
@@ -210,3 +243,55 @@ case class InsertIntoParquetTable(
}
}
+// TODO: this will be able to append to directories it created itself, not necessarily
+// to imported ones
+private[parquet] class AppendingParquetOutputFormat(offset: Int)
+ extends parquet.hadoop.ParquetOutputFormat[Row] {
+ // override to accept existing directories as valid output directory
+ override def checkOutputSpecs(job: JobContext): Unit = {}
+
+ // override to choose output filename so not overwrite existing ones
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val taskId: TaskID = context.getTaskAttemptID.getTaskID
+ val partition: Int = taskId.getId
+ val filename = s"part-r-${partition + offset}.parquet"
+ val committer: FileOutputCommitter =
+ getOutputCommitter(context).asInstanceOf[FileOutputCommitter]
+ new Path(committer.getWorkPath, filename)
+ }
+}
+
+private[parquet] object FileSystemHelper {
+ def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
+ val origPath = new Path(pathStr)
+ val fs = origPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException(
+ s"ParquetTableOperations: Path $origPath is incorrectly formatted")
+ }
+ val path = origPath.makeQualified(fs)
+ if (!fs.exists(path) || !fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ s"ParquetTableOperations: path $path does not exist or is not a directory")
+ }
+ fs.listStatus(path).map(_.getPath)
+ }
+
+ // finds the maximum taskid in the output file names at the given path
+ def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
+ val files = FileSystemHelper.listFiles(pathStr, conf)
+ // filename pattern is part-r-<int>.parquet
+ val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
+ val hiddenFileP = new scala.util.matching.Regex("_.*")
+ files.map(_.getName).map {
+ case nameP(taskid) => taskid.toInt
+ case hiddenFileP() => 0
+ case other: String => {
+ sys.error("ERROR: attempting to append to set of Parquet files and found file" +
+ s"that does not match name pattern: $other")
+ 0
+ }
+ case _ => 0
+ }.reduceLeft((a, b) => if (a < b) b else a)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index c21e400282..84b1b46094 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.types._
*
*@param root The root group converter for the record.
*/
-class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterializer[Row] {
+private[parquet] class RowRecordMaterializer(root: CatalystGroupConverter)
+ extends RecordMaterializer[Row] {
def this(parquetSchema: MessageType) =
this(new CatalystGroupConverter(ParquetTypesConverter.convertToAttributes(parquetSchema)))
@@ -48,14 +49,14 @@ class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterial
/**
* A `parquet.hadoop.api.ReadSupport` for Row objects.
*/
-class RowReadSupport extends ReadSupport[Row] with Logging {
+private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
override def prepareForRead(
conf: Configuration,
stringMap: java.util.Map[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[Row] = {
- log.debug(s"preparing for read with schema ${fileSchema.toString}")
+ log.debug(s"preparing for read with file schema $fileSchema")
new RowRecordMaterializer(readContext.getRequestedSchema)
}
@@ -67,20 +68,20 @@ class RowReadSupport extends ReadSupport[Row] with Logging {
configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString)
val requested_schema =
MessageTypeParser.parseMessageType(requested_schema_string)
-
- log.debug(s"read support initialized for original schema ${requested_schema.toString}")
+ log.debug(s"read support initialized for requested schema $requested_schema")
+ ParquetRelation.enableLogForwarding()
new ReadContext(requested_schema, keyValueMetaData)
}
}
-object RowReadSupport {
+private[parquet] object RowReadSupport {
val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
}
/**
* A `parquet.hadoop.api.WriteSupport` for Row ojects.
*/
-class RowWriteSupport extends WriteSupport[Row] with Logging {
+private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
def setSchema(schema: MessageType, configuration: Configuration) {
// for testing
this.schema = schema
@@ -104,6 +105,8 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
override def init(configuration: Configuration): WriteSupport.WriteContext = {
schema = if (schema == null) getSchema(configuration) else schema
attributes = ParquetTypesConverter.convertToAttributes(schema)
+ log.debug(s"write support initialized for requested schema $schema")
+ ParquetRelation.enableLogForwarding()
new WriteSupport.WriteContext(
schema,
new java.util.HashMap[java.lang.String, java.lang.String]())
@@ -111,10 +114,16 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
writer = recordConsumer
+ log.debug(s"preparing for write with schema $schema")
}
// TODO: add groups (nested fields)
override def write(record: Row): Unit = {
+ if (attributes.size > record.size) {
+ throw new IndexOutOfBoundsException(
+ s"Trying to write more fields than contained in row (${attributes.size}>${record.size})")
+ }
+
var index = 0
writer.startMessage()
while(index < attributes.size) {
@@ -130,7 +139,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
}
}
-object RowWriteSupport {
+private[parquet] object RowWriteSupport {
val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema"
}
@@ -139,7 +148,7 @@ object RowWriteSupport {
*
* @param schema The corresponding Catalyst schema in the form of a list of attributes.
*/
-class CatalystGroupConverter(
+private[parquet] class CatalystGroupConverter(
schema: Seq[Attribute],
protected[parquet] val current: ParquetRelation.RowType) extends GroupConverter {
@@ -177,13 +186,12 @@ class CatalystGroupConverter(
* @param parent The parent group converter.
* @param fieldIndex The index inside the record.
*/
-class CatalystPrimitiveConverter(
+private[parquet] class CatalystPrimitiveConverter(
parent: CatalystGroupConverter,
fieldIndex: Int) extends PrimitiveConverter {
// TODO: consider refactoring these together with ParquetTypesConverter
override def addBinary(value: Binary): Unit =
- // TODO: fix this once a setBinary will become available in MutableRow
- parent.getCurrentRecord.setByte(fieldIndex, value.getBytes.apply(0))
+ parent.getCurrentRecord.update(fieldIndex, value.getBytes)
override def addBoolean(value: Boolean): Unit =
parent.getCurrentRecord.setBoolean(fieldIndex, value)
@@ -208,10 +216,9 @@ class CatalystPrimitiveConverter(
* @param parent The parent group converter.
* @param fieldIndex The index inside the record.
*/
-class CatalystPrimitiveStringConverter(
+private[parquet] class CatalystPrimitiveStringConverter(
parent: CatalystGroupConverter,
fieldIndex: Int) extends CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.getCurrentRecord.setString(fieldIndex, value.toStringUsingUTF8)
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index 3340c3ff81..728e3dd1dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -26,7 +26,7 @@ import parquet.hadoop.util.ContextUtil
import parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.util.Utils
object ParquetTestData {
@@ -64,13 +64,13 @@ object ParquetTestData {
"mylong:Long"
)
- val testFile = getTempFilePath("testParquetFile").getCanonicalFile
+ val testDir = Utils.createTempDir()
- lazy val testData = new ParquetRelation("testData", testFile.toURI.toString)
+ lazy val testData = new ParquetRelation(testDir.toURI.toString)
def writeFile() = {
- testFile.delete
- val path: Path = new Path(testFile.toURI)
+ testDir.delete
+ val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet"))
val job = new Job()
val configuration: Configuration = ContextUtil.getConfiguration(job)
val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 7bb6789bd3..dffd15a618 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -45,8 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
-# Parquet logging
-parquet.hadoop.InternalParquetRecordReader=WARN
-log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN
-parquet.hadoop.ParquetInputFormat=WARN
-log4j.logger.parquet.hadoop.ParquetInputFormat=WARN
+# Parquet related logging
+log4j.logger.parquet.hadoop=WARN
+log4j.logger.org.apache.spark.sql.parquet=INFO
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index ea1733b361..a62a3c4d02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -19,27 +19,40 @@ package org.apache.spark.sql.parquet
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.mapreduce.Job
+
import parquet.hadoop.ParquetFileWriter
-import parquet.hadoop.util.ContextUtil
import parquet.schema.MessageTypeParser
+import parquet.hadoop.util.ContextUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import scala.Tuple2
// Implicits
import org.apache.spark.sql.test.TestSQLContext._
+case class TestRDDEntry(key: Int, value: String)
+
class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+
+ var testRDD: SchemaRDD = null
+
override def beforeAll() {
ParquetTestData.writeFile()
+ testRDD = parquetFile(ParquetTestData.testDir.toString)
+ testRDD.registerAsTable("testsource")
}
override def afterAll() {
- ParquetTestData.testFile.delete()
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ // here we should also unregister the table??
}
test("self-join parquet files") {
@@ -55,11 +68,18 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
case Seq(_, _) => // All good
}
- // TODO: We can't run this query as it NPEs
+ val result = query.collect()
+ assert(result.size === 9, "self-join result has incorrect size")
+ assert(result(0).size === 12, "result row has incorrect size")
+ result.zipWithIndex.foreach {
+ case (row, index) => row.zipWithIndex.foreach {
+ case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column")
+ }
+ }
}
test("Import of simple Parquet file") {
- val result = getRDD(ParquetTestData.testData).collect()
+ val result = parquetFile(ParquetTestData.testDir.toString).collect()
assert(result.size === 15)
result.zipWithIndex.foreach {
case (row, index) => {
@@ -125,20 +145,82 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
fs.delete(path, true)
}
+ test("Creating case class RDD table") {
+ TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ .registerAsTable("tmp")
+ val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
+ var counter = 1
+ rdd.foreach {
+ // '===' does not like string comparison?
+ row: Row => {
+ assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
+ counter = counter + 1
+ }
+ }
+ }
+
+ test("Saving case class RDD table to file and reading it back in") {
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ rdd.saveAsParquetFile(path)
+ val readFile = parquetFile(path)
+ readFile.registerAsTable("tmpx")
+ val rdd_copy = sql("SELECT * FROM tmpx").collect()
+ val rdd_orig = rdd.collect()
+ for(i <- 0 to 99) {
+ assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
+ assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
+ }
+ Utils.deleteRecursively(file)
+ assert(true)
+ }
+
+ test("insert (overwrite) via Scala API (new SchemaRDD)") {
+ val dirname = Utils.createTempDir()
+ val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ source_rdd.registerAsTable("source")
+ val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType))
+ dest_rdd.registerAsTable("dest")
+ sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
+ val rdd_copy1 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy1.size === 100)
+ assert(rdd_copy1(0).apply(0) === 1)
+ assert(rdd_copy1(0).apply(1) === "val_1")
+ sql("INSERT INTO dest SELECT * FROM source").collect()
+ val rdd_copy2 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy2.size === 200)
+ Utils.deleteRecursively(dirname)
+ }
+
+ test("insert (appending) to same table via Scala API") {
+ sql("INSERT INTO testsource SELECT * FROM testsource").collect()
+ val double_rdd = sql("SELECT * FROM testsource").collect()
+ assert(double_rdd != null)
+ assert(double_rdd.size === 30)
+ for(i <- (0 to 14)) {
+ assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
+ }
+ // let's restore the original test data
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ ParquetTestData.writeFile()
+ }
+
/**
- * Computes the given [[ParquetRelation]] and returns its RDD.
+ * Creates an empty SchemaRDD backed by a ParquetRelation.
*
- * @param parquetRelation The Parquet relation.
- * @return An RDD of Rows.
+ * TODO: since this is so experimental it is better to have it here and not
+ * in SQLContext. Also note that when creating new AttributeReferences
+ * one needs to take care not to create duplicate Attribute ID's.
*/
- private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = {
- val scanner = new ParquetTableScan(
- parquetRelation.output,
- parquetRelation,
- None)(TestSQLContext.sparkContext)
- scanner
- .execute
- .map(_.copy())
+ private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+ val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+ new SchemaRDD(
+ TestSQLContext,
+ parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 29834a11f4..fc053c56c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -148,6 +148,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
*/
override def unregisterTable(
databaseName: Option[String], tableName: String): Unit = ???
+
+ override def unregisterAllTables() = {}
}
object HiveMetastoreTypes extends RegexParsers {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index bc3447b9d8..0a6bea0162 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -313,6 +313,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
catalog.client.dropDatabase(db, true, false, true)
}
+ catalog.unregisterAllTables()
+
FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName =>
FunctionRegistry.unregisterTemporaryUDF(udfName)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 68d45e53cd..79ec1f1cde 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -29,7 +29,7 @@ class CachedTableSuite extends HiveComparisonTest {
}
createQueryTest("read from cached table",
- "SELECT * FROM src LIMIT 1")
+ "SELECT * FROM src LIMIT 1", reset = false)
test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
@@ -40,7 +40,7 @@ class CachedTableSuite extends HiveComparisonTest {
}
createQueryTest("read from uncached table",
- "SELECT * FROM src LIMIT 1")
+ "SELECT * FROM src LIMIT 1", reset = false)
test("make sure table is uncached") {
TestHive.table("src").queryExecution.analyzed match {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c7a350ef94..18654b308d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -170,7 +170,7 @@ abstract class HiveComparisonTest
}
val installHooksCommand = "(?i)SET.*hooks".r
- def createQueryTest(testCaseName: String, sql: String) {
+ def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) {
// If test sharding is enable, skip tests that are not in the correct shard.
shardInfo.foreach {
case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return
@@ -228,7 +228,7 @@ abstract class HiveComparisonTest
try {
// MINOR HACK: You must run a query before calling reset the first time.
TestHive.sql("SHOW TABLES")
- TestHive.reset()
+ if (reset) { TestHive.reset() }
val hiveCacheFiles = queryList.zipWithIndex.map {
case (queryString, i) =>
@@ -295,7 +295,7 @@ abstract class HiveComparisonTest
fail(errorMessage)
}
}.toSeq
- TestHive.reset()
+ if (reset) { TestHive.reset() }
computedResults
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 05ad85b622..314ca48ad8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,147 +17,138 @@
package org.apache.spark.sql.parquet
-import java.io.File
-
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
import org.apache.spark.sql.hive.TestHive
+import org.apache.spark.util.Utils
+
+// Implicits
+import org.apache.spark.sql.hive.TestHive._
class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
- val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString
-
- // runs a SQL and optionally resolves one Parquet table
- def runQuery(
- querystr: String,
- tableName: Option[String] = None,
- filename: Option[String] = None): Array[Row] = {
-
- // call to resolve references in order to get CREATE TABLE AS to work
- val query = TestHive
- .parseSql(querystr)
- val finalQuery =
- if (tableName.nonEmpty && filename.nonEmpty)
- resolveParquetTable(tableName.get, filename.get, query)
- else
- query
- TestHive.executePlan(finalQuery)
- .toRdd
- .collect()
- }
- // stores a query output to a Parquet file
- def storeQuery(querystr: String, filename: String): Unit = {
- val query = WriteToFile(
- filename,
- TestHive.parseSql(querystr))
- TestHive
- .executePlan(query)
- .stringResult()
- }
+ val dirname = Utils.createTempDir()
- /**
- * TODO: This function is necessary as long as there is no notion of a Catalog for
- * Parquet tables. Once such a thing exists this functionality should be moved there.
- */
- def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = {
- TestHive.loadTestTable("src") // may not be loaded now
- plan.transform {
- case relation @ UnresolvedRelation(databaseName, name, alias) =>
- if (name == tableName)
- ParquetRelation(tableName, filename)
- else
- relation
- case op @ InsertIntoCreatedTable(databaseName, name, child) =>
- if (name == tableName) {
- // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema
- // and for that we need the child to be resolved
- val relation = ParquetRelation.create(
- filename,
- TestHive.analyzer(child),
- TestHive.sparkContext.hadoopConfiguration,
- Some(tableName))
- InsertIntoTable(
- relation.asInstanceOf[BaseRelation],
- Map.empty,
- child,
- overwrite = false)
- } else
- op
- }
- }
+ var testRDD: SchemaRDD = null
override def beforeAll() {
// write test data
- ParquetTestData.writeFile()
- // Override initial Parquet test table
- TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData)
+ ParquetTestData.writeFile
+ testRDD = parquetFile(ParquetTestData.testDir.toString)
+ testRDD.registerAsTable("testsource")
}
override def afterAll() {
- ParquetTestData.testFile.delete()
+ Utils.deleteRecursively(ParquetTestData.testDir)
+ Utils.deleteRecursively(dirname)
+ reset() // drop all tables that were registered as part of the tests
}
+ // in case tests are failing we delete before and after each test
override def beforeEach() {
- new File(filename).getAbsoluteFile.delete()
+ Utils.deleteRecursively(dirname)
}
override def afterEach() {
- new File(filename).getAbsoluteFile.delete()
+ Utils.deleteRecursively(dirname)
}
test("SELECT on Parquet table") {
- val rdd = runQuery("SELECT * FROM parquet.testsource")
+ val rdd = sql("SELECT * FROM testsource").collect()
assert(rdd != null)
assert(rdd.forall(_.size == 6))
}
test("Simple column projection + filter on Parquet table") {
- val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true")
+ val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
assert(rdd.size === 5, "Filter returned incorrect number of rows")
assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
}
- test("Converting Hive to Parquet Table via WriteToFile") {
- storeQuery("SELECT * FROM src", filename)
- val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
- val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0))
+ test("Converting Hive to Parquet Table via saveAsParquetFile") {
+ sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
+ parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+ val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
+ val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
}
test("INSERT OVERWRITE TABLE Parquet table") {
- storeQuery("SELECT * FROM parquet.testsource", filename)
- runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
- runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
- val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
- val rddOrig = runQuery("SELECT * FROM parquet.testsource")
- compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames)
+ sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
+ parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+ val rddCopy = sql("SELECT * FROM ptable").collect()
+ val rddOrig = sql("SELECT * FROM testsource").collect()
+ assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
+ compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
}
- test("CREATE TABLE AS Parquet table") {
- runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename))
- val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
+ test("CREATE TABLE of Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmp")
+ val rddCopy =
+ sql("INSERT INTO TABLE tmp SELECT * FROM src")
+ .collect()
.sortBy[Int](_.apply(0) match {
case x: Int => x
case _ => 0
})
- val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
+ val rddOrig = sql("SELECT * FROM src")
+ .collect()
+ .sortBy(_.getInt(0))
compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
}
+ test("Appending to Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmpnew")
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+ val rddCopies = sql("SELECT * FROM tmpnew").collect()
+ val rddOrig = sql("SELECT * FROM src").collect()
+ assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number")
+ }
+
+ test("Appending to and then overwriting Parquet table") {
+ createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+ .registerAsTable("tmp")
+ sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+ sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect()
+ val rddCopies = sql("SELECT * FROM tmp").collect()
+ val rddOrig = sql("SELECT * FROM src").collect()
+ assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite")
+ }
+
private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
var counter = 0
(rddOne, rddTwo).zipped.foreach {
(a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
- case ((value_1:Array[Byte], value_2:Array[Byte]), index) =>
- assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match")
case ((value_1, value_2), index) =>
assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
}
counter = counter + 1
}
}
+
+ /**
+ * Creates an empty SchemaRDD backed by a ParquetRelation.
+ *
+ * TODO: since this is so experimental it is better to have it here and not
+ * in SQLContext. Also note that when creating new AttributeReferences
+ * one needs to take care not to create duplicate Attribute ID's.
+ */
+ private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+ val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+ new SchemaRDD(
+ TestHive,
+ parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
+ }
}