From fbebaedf26286ee8a75065822a3af1148351f828 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Thu, 3 Apr 2014 15:31:47 -0700 Subject: Spark parquet improvements A few improvements to the Parquet support for SQL queries: - Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other sources - InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in HiveQL) - tests now use the new API - Parquet logging can be set to WARNING level (Default) - Default compression for Parquet files (GZIP, as in parquet-mr) Author: Andre Schumacher Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits: 54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 14 +- .../spark/sql/catalyst/analysis/Catalog.scala | 26 +++- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../apache/spark/sql/parquet/ParquetRelation.scala | 129 +++++++++++----- .../spark/sql/parquet/ParquetTableOperations.scala | 139 +++++++++++++---- .../spark/sql/parquet/ParquetTableSupport.scala | 35 +++-- .../apache/spark/sql/parquet/ParquetTestData.scala | 10 +- sql/core/src/test/resources/log4j.properties | 8 +- .../spark/sql/parquet/ParquetQuerySuite.scala | 118 +++++++++++--- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 + .../scala/org/apache/spark/sql/hive/TestHive.scala | 2 + .../apache/spark/sql/hive/CachedTableSuite.scala | 4 +- .../sql/hive/execution/HiveComparisonTest.scala | 6 +- .../spark/sql/parquet/HiveParquetSuite.scala | 169 ++++++++++----------- 15 files changed, 460 insertions(+), 212 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 8de87594c8..4ea80fee23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -106,6 +106,8 @@ class SqlParser extends StandardTokenParsers { protected val IF = Keyword("IF") protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") + protected val INSERT = Keyword("INSERT") + protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") protected val LEFT = Keyword("LEFT") @@ -114,6 +116,7 @@ class SqlParser extends StandardTokenParsers { protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") + protected val OVERWRITE = Keyword("OVERWRITE") protected val LIKE = Keyword("LIKE") protected val RLIKE = Keyword("RLIKE") protected val REGEXP = Keyword("REGEXP") @@ -162,7 +165,7 @@ class SqlParser extends StandardTokenParsers { select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } - ) + ) | insert protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ @@ -185,6 +188,13 @@ class SqlParser extends StandardTokenParsers { withLimit } + protected lazy val insert: Parser[LogicalPlan] = + INSERT ~> opt(OVERWRITE) ~ inTo ~ select <~ opt(";") ^^ { + case o ~ r ~ s => + val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" + InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) + } + protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") protected lazy val projection: Parser[Expression] = @@ -195,6 +205,8 @@ class SqlParser extends StandardTokenParsers { protected lazy val from: Parser[LogicalPlan] = FROM ~> relations + protected lazy val inTo: Parser[LogicalPlan] = INTO ~> relation + // Based very loosely on the MySQL Grammar. // http://dev.mysql.com/doc/refman/5.0/en/join.html protected lazy val relations: Parser[LogicalPlan] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 6b58b9322c..f30b5d8167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -31,19 +31,33 @@ trait Catalog { alias: Option[String] = None): LogicalPlan def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit + def unregisterTable(databaseName: Option[String], tableName: String): Unit + + def unregisterAllTables(): Unit } class SimpleCatalog extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() - def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = { + override def registerTable( + databaseName: Option[String], + tableName: String, + plan: LogicalPlan): Unit = { tables += ((tableName, plan)) } - def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName } + override def unregisterTable( + databaseName: Option[String], + tableName: String) = { + tables -= tableName + } + + override def unregisterAllTables() = { + tables.clear() + } - def lookupRelation( + override def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = { @@ -92,6 +106,10 @@ trait OverrideCatalog extends Catalog { override def unregisterTable(databaseName: Option[String], tableName: String): Unit = { overrides.remove((databaseName, tableName)) } + + override def unregisterAllTables(): Unit = { + overrides.clear() + } } /** @@ -113,4 +131,6 @@ object EmptyCatalog extends Catalog { def unregisterTable(databaseName: Option[String], tableName: String): Unit = { throw new UnsupportedOperationException } + + override def unregisterAllTables(): Unit = {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f4bf00f4cf..36059c6630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -80,12 +80,12 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))) /** - * Loads a parequet file, returning the result as a [[SchemaRDD]]. + * Loads a Parquet file, returning the result as a [[SchemaRDD]]. * * @group userf */ def parquetFile(path: String): SchemaRDD = - new SchemaRDD(this, parquet.ParquetRelation("ParquetFile", path)) + new SchemaRDD(this, parquet.ParquetRelation(path)) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b3e51fdf75..fe8bd5a508 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -171,10 +171,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // TODO: need to support writing to other types of files. Unify the below code paths. case logical.WriteToFile(path, child) => val relation = - ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None) - InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil + ParquetRelation.create(path, child, sparkContext.hadoopConfiguration) + InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => - InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil + InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters, relation: ParquetRelation) => // TODO: Should be pushing down filters as well. pruneFilterProject( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 4ab755c096..114bfbb719 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -17,30 +17,29 @@ package org.apache.spark.sql.parquet -import java.io.{IOException, FileNotFoundException} - -import scala.collection.JavaConversions._ +import java.io.IOException import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.mapreduce.Job -import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} import parquet.hadoop.util.ContextUtil -import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} +import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader} +import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import parquet.io.api.{Binary, RecordConsumer} +import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser} import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} import parquet.schema.Type.Repetition -import parquet.schema.{MessageType, MessageTypeParser} -import parquet.schema.{PrimitiveType => ParquetPrimitiveType} -import parquet.schema.{Type => ParquetType} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} import org.apache.spark.sql.catalyst.types._ +// Implicits +import scala.collection.JavaConversions._ + /** * Relation that consists of data stored in a Parquet columnar format. * @@ -48,14 +47,14 @@ import org.apache.spark.sql.catalyst.types._ * of using this class directly. * * {{{ - * val parquetRDD = sqlContext.parquetFile("path/to/parequet.file") + * val parquetRDD = sqlContext.parquetFile("path/to/parquet.file") * }}} * - * @param tableName The name of the relation that can be used in queries. * @param path The path to the Parquet file. */ -case class ParquetRelation(tableName: String, path: String) - extends BaseRelation with MultiInstanceRelation { +private[sql] case class ParquetRelation(val path: String) + extends LeafNode with MultiInstanceRelation { + self: Product => /** Schema derived from ParquetFile */ def parquetSchema: MessageType = @@ -65,33 +64,59 @@ case class ParquetRelation(tableName: String, path: String) .getSchema /** Attributes */ - val attributes = + override val output = ParquetTypesConverter - .convertToAttributes(parquetSchema) + .convertToAttributes(parquetSchema) - /** Output */ - override val output = attributes - - // Parquet files have no concepts of keys, therefore no Partitioner - // Note: we could allow Block level access; needs to be thought through - override def isPartitioned = false - - override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type] + override def newInstance = ParquetRelation(path).asInstanceOf[this.type] // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, override def equals(other: Any) = other match { case p: ParquetRelation => - p.tableName == tableName && p.path == path && p.output == output + p.path == path && p.output == output case _ => false } } -object ParquetRelation { +private[sql] object ParquetRelation { + + def enableLogForwarding() { + // Note: Parquet does not use forwarding to parent loggers which + // is required for the JUL-SLF4J bridge to work. Also there is + // a default logger that appends to Console which needs to be + // reset. + import org.slf4j.bridge.SLF4JBridgeHandler + import java.util.logging.Logger + import java.util.logging.LogManager + + val loggerNames = Seq( + "parquet.hadoop.ColumnChunkPageWriteStore", + "parquet.hadoop.InternalParquetRecordWriter", + "parquet.hadoop.ParquetRecordReader", + "parquet.hadoop.ParquetInputFormat", + "parquet.hadoop.ParquetOutputFormat", + "parquet.hadoop.ParquetFileReader", + "parquet.hadoop.InternalParquetRecordReader", + "parquet.hadoop.codec.CodecConfig") + LogManager.getLogManager.reset() + SLF4JBridgeHandler.install() + for(name <- loggerNames) { + val logger = Logger.getLogger(name) + logger.setParent(Logger.getGlobal) + logger.setUseParentHandlers(true) + } + } // The element type for the RDDs that this relation maps to. type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow + // The compression type + type CompressionType = parquet.hadoop.metadata.CompressionCodecName + + // The default compression + val defaultCompression = CompressionCodecName.GZIP + /** * Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that * this is used inside [[org.apache.spark.sql.execution.SparkStrategies SparkStrategies]] to @@ -100,24 +125,39 @@ object ParquetRelation { * * @param pathString The directory the Parquetfile will be stored in. * @param child The child node that will be used for extracting the schema. - * @param conf A configuration configuration to be used. - * @param tableName The name of the resulting relation. - * @return An empty ParquetRelation inferred metadata. + * @param conf A configuration to be used. + * @return An empty ParquetRelation with inferred metadata. */ def create(pathString: String, child: LogicalPlan, - conf: Configuration, - tableName: Option[String]): ParquetRelation = { + conf: Configuration): ParquetRelation = { if (!child.resolved) { throw new UnresolvedException[LogicalPlan]( child, "Attempt to create Parquet table from unresolved child (when schema is not available)") } + createEmpty(pathString, child.output, conf) + } - val name = s"${tableName.getOrElse(child.nodeName)}_parquet" + /** + * Creates an empty ParquetRelation and underlying Parquetfile that only + * consists of the Metadata for the given schema. + * + * @param pathString The directory the Parquetfile will be stored in. + * @param attributes The schema of the relation. + * @param conf A configuration to be used. + * @return An empty ParquetRelation. + */ + def createEmpty(pathString: String, + attributes: Seq[Attribute], + conf: Configuration): ParquetRelation = { val path = checkPath(pathString, conf) - ParquetTypesConverter.writeMetaData(child.output, path, conf) - new ParquetRelation(name, path.toString) + if (conf.get(ParquetOutputFormat.COMPRESSION) == null) { + conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name()) + } + ParquetRelation.enableLogForwarding() + ParquetTypesConverter.writeMetaData(attributes, path, conf) + new ParquetRelation(path.toString) } private def checkPath(pathStr: String, conf: Configuration): Path = { @@ -143,7 +183,7 @@ object ParquetRelation { } } -object ParquetTypesConverter { +private[parquet] object ParquetTypesConverter { def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match { // for now map binary to string type // TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema @@ -242,6 +282,7 @@ object ParquetTypesConverter { extraMetadata, "Spark") + ParquetRelation.enableLogForwarding() ParquetFileWriter.writeMetadataFile( conf, path, @@ -268,16 +309,24 @@ object ParquetTypesConverter { throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") } val path = origPath.makeQualified(fs) + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + s"Expected $path for be a directory with Parquet files/metadata") + } + ParquetRelation.enableLogForwarding() val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) + // if this is a new table that was just created we will find only the metadata file if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { - // TODO: improve exception handling, etc. ParquetFileReader.readFooter(conf, metadataPath) } else { - if (!fs.exists(path) || !fs.isFile(path)) { - throw new FileNotFoundException( - s"Could not find file ${path.toString} when trying to read metadata") + // there may be one or more Parquet files in the given directory + val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) + // TODO: for now we assume that all footers (if there is more than one) have identical + // metadata; we may want to add a check here at some point + if (footers.size() == 0) { + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") } - ParquetFileReader.readFooter(conf, path) + footers(0).getParquetMetadata } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 7285f5b88b..d5846baa72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -24,26 +24,29 @@ import java.util.Date import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter} -import parquet.hadoop.util.ContextUtil import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat} +import parquet.hadoop.util.ContextUtil import parquet.io.InvalidRecordException import parquet.schema.MessageType +import org.apache.spark.{SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} -import org.apache.spark.{SerializableWritable, SparkContext, TaskContext} /** * Parquet table scan operator. Imports the file that backs the given * [[ParquetRelation]] as a RDD[Row]. */ case class ParquetTableScan( - @transient output: Seq[Attribute], - @transient relation: ParquetRelation, - @transient columnPruningPred: Option[Expression])( + // note: output cannot be transient, see + // https://issues.apache.org/jira/browse/SPARK-1367 + output: Seq[Attribute], + relation: ParquetRelation, + columnPruningPred: Option[Expression])( @transient val sc: SparkContext) extends LeafNode { @@ -53,6 +56,12 @@ case class ParquetTableScan( job, classOf[org.apache.spark.sql.parquet.RowReadSupport]) val conf: Configuration = ContextUtil.getConfiguration(job) + val fileList = FileSystemHelper.listFiles(relation.path, conf) + // add all paths in the directory but skip "hidden" ones such + // as "_SUCCESS" and "_metadata" + for (path <- fileList if !path.getName.startsWith("_")) { + NewFileInputFormat.addInputPath(job, path) + } conf.set( RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, ParquetTypesConverter.convertFromAttributes(output).toString) @@ -63,14 +72,12 @@ case class ParquetTableScan( ``FilteredRecordReader`` (via Configuration, for example). Simple filter-rows-by-column-values however should be supported. */ - sc.newAPIHadoopFile( - relation.path, - classOf[ParquetInputFormat[Row]], - classOf[Void], classOf[Row], - conf) + sc.newAPIHadoopRDD(conf, classOf[ParquetInputFormat[Row]], classOf[Void], classOf[Row]) .map(_._2) } + override def otherCopyArgs = sc :: Nil + /** * Applies a (candidate) projection. * @@ -108,15 +115,31 @@ case class ParquetTableScan( } } +/** + * Operator that acts as a sink for queries on RDDs and can be used to + * store the output inside a directory of Parquet files. This operator + * is similar to Hive's INSERT INTO TABLE operation in the sense that + * one can choose to either overwrite or append to a directory. Note + * that consecutive insertions to the same table must have compatible + * (source) schemas. + * + * WARNING: EXPERIMENTAL! InsertIntoParquetTable with overwrite=false may + * cause data corruption in the case that multiple users try to append to + * the same table simultaneously. Inserting into a table that was + * previously generated by other means (e.g., by creating an HDFS + * directory and importing Parquet files generated by other tools) may + * cause unpredicted behaviour and therefore results in a RuntimeException + * (only detected via filename pattern so will not catch all cases). + */ case class InsertIntoParquetTable( - @transient relation: ParquetRelation, - @transient child: SparkPlan)( + relation: ParquetRelation, + child: SparkPlan, + overwrite: Boolean = false)( @transient val sc: SparkContext) extends UnaryNode with SparkHadoopMapReduceUtil { /** - * Inserts all the rows in the Parquet file. Note that OVERWRITE is implicit, since - * Parquet files are write-once. + * Inserts all rows into the Parquet file. */ override def execute() = { // TODO: currently we do not check whether the "schema"s are compatible @@ -135,19 +158,21 @@ case class InsertIntoParquetTable( classOf[org.apache.spark.sql.parquet.RowWriteSupport]) // TODO: move that to function in object - val conf = job.getConfiguration + val conf = ContextUtil.getConfiguration(job) conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, relation.parquetSchema.toString) val fspath = new Path(relation.path) val fs = fspath.getFileSystem(conf) - try { - fs.delete(fspath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${fspath.toString} prior" - + s" to InsertIntoParquetTable:\n${e.toString}") + if (overwrite) { + try { + fs.delete(fspath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${fspath.toString} prior" + + s" to InsertIntoParquetTable:\n${e.toString}") + } } saveAsHadoopFile(childRdd, relation.path.toString, conf) @@ -157,6 +182,8 @@ case class InsertIntoParquetTable( override def output = child.output + override def otherCopyArgs = sc :: Nil + // based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]] // TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2? // .. then we could use the default one and could use [[MutablePair]] @@ -167,15 +194,21 @@ case class InsertIntoParquetTable( conf: Configuration) { val job = new Job(conf) val keyType = classOf[Void] - val outputFormatType = classOf[parquet.hadoop.ParquetOutputFormat[Row]] job.setOutputKeyClass(keyType) job.setOutputValueClass(classOf[Row]) - val wrappedConf = new SerializableWritable(job.getConfiguration) NewFileOutputFormat.setOutputPath(job, new Path(path)) + val wrappedConf = new SerializableWritable(job.getConfiguration) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = sc.newRddId() + val taskIdOffset = + if (overwrite) 1 + else { + FileSystemHelper + .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1 + } + def writeShard(context: TaskContext, iter: Iterator[Row]): Int = { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. @@ -184,7 +217,7 @@ case class InsertIntoParquetTable( val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = outputFormatType.newInstance + val format = new AppendingParquetOutputFormat(taskIdOffset) val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext) @@ -196,7 +229,7 @@ case class InsertIntoParquetTable( committer.commitTask(hadoopContext) return 1 } - val jobFormat = outputFormatType.newInstance + val jobFormat = new AppendingParquetOutputFormat(taskIdOffset) /* apparently we need a TaskAttemptID to construct an OutputCommitter; * however we're only going to use this local OutputCommitter for * setupJob/commitJob, so we just use a dummy "map" task. @@ -210,3 +243,55 @@ case class InsertIntoParquetTable( } } +// TODO: this will be able to append to directories it created itself, not necessarily +// to imported ones +private[parquet] class AppendingParquetOutputFormat(offset: Int) + extends parquet.hadoop.ParquetOutputFormat[Row] { + // override to accept existing directories as valid output directory + override def checkOutputSpecs(job: JobContext): Unit = {} + + // override to choose output filename so not overwrite existing ones + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val taskId: TaskID = context.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = s"part-r-${partition + offset}.parquet" + val committer: FileOutputCommitter = + getOutputCommitter(context).asInstanceOf[FileOutputCommitter] + new Path(committer.getWorkPath, filename) + } +} + +private[parquet] object FileSystemHelper { + def listFiles(pathStr: String, conf: Configuration): Seq[Path] = { + val origPath = new Path(pathStr) + val fs = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException( + s"ParquetTableOperations: Path $origPath is incorrectly formatted") + } + val path = origPath.makeQualified(fs) + if (!fs.exists(path) || !fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + s"ParquetTableOperations: path $path does not exist or is not a directory") + } + fs.listStatus(path).map(_.getPath) + } + + // finds the maximum taskid in the output file names at the given path + def findMaxTaskId(pathStr: String, conf: Configuration): Int = { + val files = FileSystemHelper.listFiles(pathStr, conf) + // filename pattern is part-r-.parquet + val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid") + val hiddenFileP = new scala.util.matching.Regex("_.*") + files.map(_.getName).map { + case nameP(taskid) => taskid.toInt + case hiddenFileP() => 0 + case other: String => { + sys.error("ERROR: attempting to append to set of Parquet files and found file" + + s"that does not match name pattern: $other") + 0 + } + case _ => 0 + }.reduceLeft((a, b) => if (a < b) b else a) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index c21e400282..84b1b46094 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.types._ * *@param root The root group converter for the record. */ -class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterializer[Row] { +private[parquet] class RowRecordMaterializer(root: CatalystGroupConverter) + extends RecordMaterializer[Row] { def this(parquetSchema: MessageType) = this(new CatalystGroupConverter(ParquetTypesConverter.convertToAttributes(parquetSchema))) @@ -48,14 +49,14 @@ class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterial /** * A `parquet.hadoop.api.ReadSupport` for Row objects. */ -class RowReadSupport extends ReadSupport[Row] with Logging { +private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { override def prepareForRead( conf: Configuration, stringMap: java.util.Map[String, String], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[Row] = { - log.debug(s"preparing for read with schema ${fileSchema.toString}") + log.debug(s"preparing for read with file schema $fileSchema") new RowRecordMaterializer(readContext.getRequestedSchema) } @@ -67,20 +68,20 @@ class RowReadSupport extends ReadSupport[Row] with Logging { configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString) val requested_schema = MessageTypeParser.parseMessageType(requested_schema_string) - - log.debug(s"read support initialized for original schema ${requested_schema.toString}") + log.debug(s"read support initialized for requested schema $requested_schema") + ParquetRelation.enableLogForwarding() new ReadContext(requested_schema, keyValueMetaData) } } -object RowReadSupport { +private[parquet] object RowReadSupport { val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" } /** * A `parquet.hadoop.api.WriteSupport` for Row ojects. */ -class RowWriteSupport extends WriteSupport[Row] with Logging { +private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { def setSchema(schema: MessageType, configuration: Configuration) { // for testing this.schema = schema @@ -104,6 +105,8 @@ class RowWriteSupport extends WriteSupport[Row] with Logging { override def init(configuration: Configuration): WriteSupport.WriteContext = { schema = if (schema == null) getSchema(configuration) else schema attributes = ParquetTypesConverter.convertToAttributes(schema) + log.debug(s"write support initialized for requested schema $schema") + ParquetRelation.enableLogForwarding() new WriteSupport.WriteContext( schema, new java.util.HashMap[java.lang.String, java.lang.String]()) @@ -111,10 +114,16 @@ class RowWriteSupport extends WriteSupport[Row] with Logging { override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { writer = recordConsumer + log.debug(s"preparing for write with schema $schema") } // TODO: add groups (nested fields) override def write(record: Row): Unit = { + if (attributes.size > record.size) { + throw new IndexOutOfBoundsException( + s"Trying to write more fields than contained in row (${attributes.size}>${record.size})") + } + var index = 0 writer.startMessage() while(index < attributes.size) { @@ -130,7 +139,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging { } } -object RowWriteSupport { +private[parquet] object RowWriteSupport { val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema" } @@ -139,7 +148,7 @@ object RowWriteSupport { * * @param schema The corresponding Catalyst schema in the form of a list of attributes. */ -class CatalystGroupConverter( +private[parquet] class CatalystGroupConverter( schema: Seq[Attribute], protected[parquet] val current: ParquetRelation.RowType) extends GroupConverter { @@ -177,13 +186,12 @@ class CatalystGroupConverter( * @param parent The parent group converter. * @param fieldIndex The index inside the record. */ -class CatalystPrimitiveConverter( +private[parquet] class CatalystPrimitiveConverter( parent: CatalystGroupConverter, fieldIndex: Int) extends PrimitiveConverter { // TODO: consider refactoring these together with ParquetTypesConverter override def addBinary(value: Binary): Unit = - // TODO: fix this once a setBinary will become available in MutableRow - parent.getCurrentRecord.setByte(fieldIndex, value.getBytes.apply(0)) + parent.getCurrentRecord.update(fieldIndex, value.getBytes) override def addBoolean(value: Boolean): Unit = parent.getCurrentRecord.setBoolean(fieldIndex, value) @@ -208,10 +216,9 @@ class CatalystPrimitiveConverter( * @param parent The parent group converter. * @param fieldIndex The index inside the record. */ -class CatalystPrimitiveStringConverter( +private[parquet] class CatalystPrimitiveStringConverter( parent: CatalystGroupConverter, fieldIndex: Int) extends CatalystPrimitiveConverter(parent, fieldIndex) { override def addBinary(value: Binary): Unit = parent.getCurrentRecord.setString(fieldIndex, value.toStringUsingUTF8) } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 3340c3ff81..728e3dd1dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -26,7 +26,7 @@ import parquet.hadoop.util.ContextUtil import parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.util.Utils object ParquetTestData { @@ -64,13 +64,13 @@ object ParquetTestData { "mylong:Long" ) - val testFile = getTempFilePath("testParquetFile").getCanonicalFile + val testDir = Utils.createTempDir() - lazy val testData = new ParquetRelation("testData", testFile.toURI.toString) + lazy val testData = new ParquetRelation(testDir.toURI.toString) def writeFile() = { - testFile.delete - val path: Path = new Path(testFile.toURI) + testDir.delete + val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet")) val job = new Job() val configuration: Configuration = ContextUtil.getConfiguration(job) val schema: MessageType = MessageTypeParser.parseMessageType(testSchema) diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index 7bb6789bd3..dffd15a618 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -45,8 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF -# Parquet logging -parquet.hadoop.InternalParquetRecordReader=WARN -log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN -parquet.hadoop.ParquetInputFormat=WARN -log4j.logger.parquet.hadoop.ParquetInputFormat=WARN +# Parquet related logging +log4j.logger.parquet.hadoop=WARN +log4j.logger.org.apache.spark.sql.parquet=INFO diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index ea1733b361..a62a3c4d02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -19,27 +19,40 @@ package org.apache.spark.sql.parquet import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.mapreduce.Job + import parquet.hadoop.ParquetFileWriter -import parquet.hadoop.util.ContextUtil import parquet.schema.MessageTypeParser +import parquet.hadoop.util.ContextUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType} +import org.apache.spark.sql.{parquet, SchemaRDD} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import scala.Tuple2 // Implicits import org.apache.spark.sql.test.TestSQLContext._ +case class TestRDDEntry(key: Int, value: String) + class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { + + var testRDD: SchemaRDD = null + override def beforeAll() { ParquetTestData.writeFile() + testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("testsource") } override def afterAll() { - ParquetTestData.testFile.delete() + Utils.deleteRecursively(ParquetTestData.testDir) + // here we should also unregister the table?? } test("self-join parquet files") { @@ -55,11 +68,18 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { case Seq(_, _) => // All good } - // TODO: We can't run this query as it NPEs + val result = query.collect() + assert(result.size === 9, "self-join result has incorrect size") + assert(result(0).size === 12, "result row has incorrect size") + result.zipWithIndex.foreach { + case (row, index) => row.zipWithIndex.foreach { + case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column") + } + } } test("Import of simple Parquet file") { - val result = getRDD(ParquetTestData.testData).collect() + val result = parquetFile(ParquetTestData.testDir.toString).collect() assert(result.size === 15) result.zipWithIndex.foreach { case (row, index) => { @@ -125,20 +145,82 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll { fs.delete(path, true) } + test("Creating case class RDD table") { + TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + .registerAsTable("tmp") + val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0)) + var counter = 1 + rdd.foreach { + // '===' does not like string comparison? + row: Row => { + assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + counter = counter + 1 + } + } + } + + test("Saving case class RDD table to file and reading it back in") { + val file = getTempFilePath("parquet") + val path = file.toString + val rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.saveAsParquetFile(path) + val readFile = parquetFile(path) + readFile.registerAsTable("tmpx") + val rdd_copy = sql("SELECT * FROM tmpx").collect() + val rdd_orig = rdd.collect() + for(i <- 0 to 99) { + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") + } + Utils.deleteRecursively(file) + assert(true) + } + + test("insert (overwrite) via Scala API (new SchemaRDD)") { + val dirname = Utils.createTempDir() + val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + source_rdd.registerAsTable("source") + val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType)) + dest_rdd.registerAsTable("dest") + sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() + val rdd_copy1 = sql("SELECT * FROM dest").collect() + assert(rdd_copy1.size === 100) + assert(rdd_copy1(0).apply(0) === 1) + assert(rdd_copy1(0).apply(1) === "val_1") + sql("INSERT INTO dest SELECT * FROM source").collect() + val rdd_copy2 = sql("SELECT * FROM dest").collect() + assert(rdd_copy2.size === 200) + Utils.deleteRecursively(dirname) + } + + test("insert (appending) to same table via Scala API") { + sql("INSERT INTO testsource SELECT * FROM testsource").collect() + val double_rdd = sql("SELECT * FROM testsource").collect() + assert(double_rdd != null) + assert(double_rdd.size === 30) + for(i <- (0 to 14)) { + assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match") + } + // let's restore the original test data + Utils.deleteRecursively(ParquetTestData.testDir) + ParquetTestData.writeFile() + } + /** - * Computes the given [[ParquetRelation]] and returns its RDD. + * Creates an empty SchemaRDD backed by a ParquetRelation. * - * @param parquetRelation The Parquet relation. - * @return An RDD of Rows. + * TODO: since this is so experimental it is better to have it here and not + * in SQLContext. Also note that when creating new AttributeReferences + * one needs to take care not to create duplicate Attribute ID's. */ - private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = { - val scanner = new ParquetTableScan( - parquetRelation.output, - parquetRelation, - None)(TestSQLContext.sparkContext) - scanner - .execute - .map(_.copy()) + private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { + val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) + new SchemaRDD( + TestSQLContext, + parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 29834a11f4..fc053c56c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -148,6 +148,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { */ override def unregisterTable( databaseName: Option[String], tableName: String): Unit = ??? + + override def unregisterAllTables() = {} } object HiveMetastoreTypes extends RegexParsers { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index bc3447b9d8..0a6bea0162 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -313,6 +313,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { catalog.client.dropDatabase(db, true, false, true) } + catalog.unregisterAllTables() + FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 68d45e53cd..79ec1f1cde 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -29,7 +29,7 @@ class CachedTableSuite extends HiveComparisonTest { } createQueryTest("read from cached table", - "SELECT * FROM src LIMIT 1") + "SELECT * FROM src LIMIT 1", reset = false) test("check that table is cached and uncache") { TestHive.table("src").queryExecution.analyzed match { @@ -40,7 +40,7 @@ class CachedTableSuite extends HiveComparisonTest { } createQueryTest("read from uncached table", - "SELECT * FROM src LIMIT 1") + "SELECT * FROM src LIMIT 1", reset = false) test("make sure table is uncached") { TestHive.table("src").queryExecution.analyzed match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index c7a350ef94..18654b308d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -170,7 +170,7 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest(testCaseName: String, sql: String) { + def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { // If test sharding is enable, skip tests that are not in the correct shard. shardInfo.foreach { case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return @@ -228,7 +228,7 @@ abstract class HiveComparisonTest try { // MINOR HACK: You must run a query before calling reset the first time. TestHive.sql("SHOW TABLES") - TestHive.reset() + if (reset) { TestHive.reset() } val hiveCacheFiles = queryList.zipWithIndex.map { case (queryString, i) => @@ -295,7 +295,7 @@ abstract class HiveComparisonTest fail(errorMessage) } }.toSeq - TestHive.reset() + if (reset) { TestHive.reset() } computedResults } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 05ad85b622..314ca48ad8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,147 +17,138 @@ package org.apache.spark.sql.parquet -import java.io.File - import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} +import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType} +import org.apache.spark.sql.{parquet, SchemaRDD} import org.apache.spark.sql.hive.TestHive +import org.apache.spark.util.Utils + +// Implicits +import org.apache.spark.sql.hive.TestHive._ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { - val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString - - // runs a SQL and optionally resolves one Parquet table - def runQuery( - querystr: String, - tableName: Option[String] = None, - filename: Option[String] = None): Array[Row] = { - - // call to resolve references in order to get CREATE TABLE AS to work - val query = TestHive - .parseSql(querystr) - val finalQuery = - if (tableName.nonEmpty && filename.nonEmpty) - resolveParquetTable(tableName.get, filename.get, query) - else - query - TestHive.executePlan(finalQuery) - .toRdd - .collect() - } - // stores a query output to a Parquet file - def storeQuery(querystr: String, filename: String): Unit = { - val query = WriteToFile( - filename, - TestHive.parseSql(querystr)) - TestHive - .executePlan(query) - .stringResult() - } + val dirname = Utils.createTempDir() - /** - * TODO: This function is necessary as long as there is no notion of a Catalog for - * Parquet tables. Once such a thing exists this functionality should be moved there. - */ - def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = { - TestHive.loadTestTable("src") // may not be loaded now - plan.transform { - case relation @ UnresolvedRelation(databaseName, name, alias) => - if (name == tableName) - ParquetRelation(tableName, filename) - else - relation - case op @ InsertIntoCreatedTable(databaseName, name, child) => - if (name == tableName) { - // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema - // and for that we need the child to be resolved - val relation = ParquetRelation.create( - filename, - TestHive.analyzer(child), - TestHive.sparkContext.hadoopConfiguration, - Some(tableName)) - InsertIntoTable( - relation.asInstanceOf[BaseRelation], - Map.empty, - child, - overwrite = false) - } else - op - } - } + var testRDD: SchemaRDD = null override def beforeAll() { // write test data - ParquetTestData.writeFile() - // Override initial Parquet test table - TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData) + ParquetTestData.writeFile + testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("testsource") } override def afterAll() { - ParquetTestData.testFile.delete() + Utils.deleteRecursively(ParquetTestData.testDir) + Utils.deleteRecursively(dirname) + reset() // drop all tables that were registered as part of the tests } + // in case tests are failing we delete before and after each test override def beforeEach() { - new File(filename).getAbsoluteFile.delete() + Utils.deleteRecursively(dirname) } override def afterEach() { - new File(filename).getAbsoluteFile.delete() + Utils.deleteRecursively(dirname) } test("SELECT on Parquet table") { - val rdd = runQuery("SELECT * FROM parquet.testsource") + val rdd = sql("SELECT * FROM testsource").collect() assert(rdd != null) assert(rdd.forall(_.size == 6)) } test("Simple column projection + filter on Parquet table") { - val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true") + val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() assert(rdd.size === 5, "Filter returned incorrect number of rows") assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value") } - test("Converting Hive to Parquet Table via WriteToFile") { - storeQuery("SELECT * FROM src", filename) - val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0)) - val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0)) + test("Converting Hive to Parquet Table via saveAsParquetFile") { + sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) + parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") + val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0)) + val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0)) compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String")) } test("INSERT OVERWRITE TABLE Parquet table") { - storeQuery("SELECT * FROM parquet.testsource", filename) - runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename)) - runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename)) - val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename)) - val rddOrig = runQuery("SELECT * FROM parquet.testsource") - compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames) + sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) + parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") + // let's do three overwrites for good measure + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + val rddCopy = sql("SELECT * FROM ptable").collect() + val rddOrig = sql("SELECT * FROM testsource").collect() + assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??") + compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) } - test("CREATE TABLE AS Parquet table") { - runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename)) - val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename)) + test("CREATE TABLE of Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmp") + val rddCopy = + sql("INSERT INTO TABLE tmp SELECT * FROM src") + .collect() .sortBy[Int](_.apply(0) match { case x: Int => x case _ => 0 }) - val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0)) + val rddOrig = sql("SELECT * FROM src") + .collect() + .sortBy(_.getInt(0)) compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String")) } + test("Appending to Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmpnew") + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + val rddCopies = sql("SELECT * FROM tmpnew").collect() + val rddOrig = sql("SELECT * FROM src").collect() + assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number") + } + + test("Appending to and then overwriting Parquet table") { + createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) + .registerAsTable("tmp") + sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect() + val rddCopies = sql("SELECT * FROM tmp").collect() + val rddOrig = sql("SELECT * FROM src").collect() + assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite") + } + private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { var counter = 0 (rddOne, rddTwo).zipped.foreach { (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach { - case ((value_1:Array[Byte], value_2:Array[Byte]), index) => - assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match") case ((value_1, value_2), index) => assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match") } counter = counter + 1 } } + + /** + * Creates an empty SchemaRDD backed by a ParquetRelation. + * + * TODO: since this is so experimental it is better to have it here and not + * in SQLContext. Also note that when creating new AttributeReferences + * one needs to take care not to create duplicate Attribute ID's. + */ + private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = { + val attributes = schema.map(t => new AttributeReference(t._1, t._2)()) + new SchemaRDD( + TestHive, + parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration)) + } } -- cgit v1.2.3