aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndre Schumacher <andre.schumacher@iki.fi>2014-06-19 23:47:45 -0700
committerReynold Xin <rxin@apache.org>2014-06-19 23:47:45 -0700
commitf479cf3743e416ee08e62806e1b34aff5998ac22 (patch)
tree2d3189d95f0bb552dda1a45f4a2bb51f8d5f1e3f /sql
parentf397e92eb2986f4436fb9e66777fc652f91d8494 (diff)
downloadspark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.gz
spark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.bz2
spark-f479cf3743e416ee08e62806e1b34aff5998ac22.zip
SPARK-1293 [SQL] Parquet support for nested types
It should be possible to import and export data stored in Parquet's columnar format that contains nested types. For example: ```java message AddressBook { required binary owner; optional group ownerPhoneNumbers { repeated binary array; } optional group contacts { repeated group array { required binary name; optional binary phoneNumber; } } optional group nameToApartmentNumber { repeated group map { required binary key; required int32 value; } } } ``` The example could model a type (AddressBook) that contains records made of strings (owner), lists (ownerPhoneNumbers) and a table of contacts (e.g., a list of pairs or a map that can contain null values but keys must not be null). The list of tasks are as follows: <h6>Implement support for converting nested Parquet types to Spark/Catalyst types:</h6> - [x] Structs - [x] Lists - [x] Maps (note: currently keys need to be Strings) <h6>Implement import (via ``parquetFile``) of nested Parquet types (first version in this PR)</h6> - [x] Initial version <h6>Implement export (via ``saveAsParquetFile``)</h6> - [x] Initial version <h6>Test support for AvroParquet, etc.</h6> - [x] Initial testing of import of avro-generated Parquet data (simple + nested) Example: ```scala val data = TestSQLContext .parquetFile("input.dir") .toSchemaRDD data.registerAsTable("data") sql("SELECT owner, contacts[1].name, nameToApartmentNumber['John'] FROM data").collect() ``` Author: Andre Schumacher <andre.schumacher@iki.fi> Author: Michael Armbrust <michael@databricks.com> Closes #360 from AndreSchumacher/nested_parquet and squashes the following commits: 30708c8 [Andre Schumacher] Taking out AvroParquet test for now to remove Avro dependency 95c1367 [Andre Schumacher] Changes to ParquetRelation and its metadata 7eceb67 [Andre Schumacher] Review feedback 94eea3a [Andre Schumacher] Scalastyle 403061f [Andre Schumacher] Fixing some issues with tests and schema metadata b8a8b9a [Andre Schumacher] More fixes to short and byte conversion 63d1b57 [Andre Schumacher] Cleaning up and Scalastyle 88e6bdb [Andre Schumacher] Attempting to fix loss of schema 37e0a0a [Andre Schumacher] Cleaning up 14c3fd8 [Andre Schumacher] Attempting to fix Spark-Parquet schema conversion 3e1456c [Michael Armbrust] WIP: Directly serialize catalyst attributes. f7aeba3 [Michael Armbrust] [SPARK-1982] Support for ByteType and ShortType. 3104886 [Michael Armbrust] Nested Rows should be Rows, not Seqs. 3c6b25f [Andre Schumacher] Trying to reduce no-op changes wrt master 31465d6 [Andre Schumacher] Scalastyle: fixing commented out bottom de02538 [Andre Schumacher] Cleaning up ParquetTestData 2f5a805 [Andre Schumacher] Removing stripMargin from test schemas 191bc0d [Andre Schumacher] Changing to Seq for ArrayType, refactoring SQLParser for nested field extension cbb5793 [Andre Schumacher] Code review feedback 32229c7 [Andre Schumacher] Removing Row nested values and placing by generic types 0ae9376 [Andre Schumacher] Doc strings and simplifying ParquetConverter.scala a6b4f05 [Andre Schumacher] Cleaning up ArrayConverter, moving classTag to NativeType, adding NativeRow 431f00f [Andre Schumacher] Fixing problems introduced during rebase c52ff2c [Andre Schumacher] Adding native-array converter 619c397 [Andre Schumacher] Completing Map testcase 79d81d5 [Andre Schumacher] Replacing field names for array and map in WriteSupport f466ff0 [Andre Schumacher] Added ParquetAvro tests and revised Array conversion adc1258 [Andre Schumacher] Optimizing imports e99cc51 [Andre Schumacher] Fixing nested WriteSupport and adding tests 1dc5ac9 [Andre Schumacher] First version of WriteSupport for nested types d1911dc [Andre Schumacher] Simplifying ArrayType conversion f777b4b [Andre Schumacher] Scalastyle 824500c [Andre Schumacher] Adding attribute resolution for MapType b539fde [Andre Schumacher] First commit for MapType a594aed [Andre Schumacher] Scalastyle 4e25fcb [Andre Schumacher] Adding resolution of complex ArrayTypes f8f8911 [Andre Schumacher] For primitive rows fall back to more efficient converter, code reorg 6dbc9b7 [Andre Schumacher] Fixing some problems intruduced during rebase b7fcc35 [Andre Schumacher] Documenting conversions, bugfix, wrappers of Rows ee70125 [Andre Schumacher] fixing one problem with arrayconverter 98219cf [Andre Schumacher] added struct converter 5d80461 [Andre Schumacher] fixing one problem with nested structs and breaking up files 1b1b3d6 [Andre Schumacher] Fixing one problem with nested arrays ddb40d2 [Andre Schumacher] Extending tests for nested Parquet data 745a42b [Andre Schumacher] Completing testcase for nested data (Addressbook( 6125c75 [Andre Schumacher] First working nested Parquet record input 4d4892a [Andre Schumacher] First commit nested Parquet read converters aa688fe [Andre Schumacher] Adding conversion of nested Parquet schemas
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala111
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala98
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala667
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala182
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala326
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala298
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala408
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala356
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
14 files changed, 2102 insertions, 384 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 46fcfbb9e2..2ad2d04af5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected case class Keyword(str: String)
protected implicit def asParser(k: Keyword): Parser[String] =
- allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
-
- protected class SqlLexical extends StdLexical {
- case class FloatLit(chars: String) extends Token {
- override def toString = chars
- }
- override lazy val token: Parser[Token] = (
- identChar ~ rep( identChar | digit ) ^^
- { case first ~ rest => processIdent(first :: rest mkString "") }
- | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
- case i ~ None => NumericLit(i mkString "")
- case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
- }
- | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
- { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
- | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
- { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
- | EofCh ^^^ EOF
- | '\'' ~> failure("unclosed string literal")
- | '\"' ~> failure("unclosed string literal")
- | delim
- | failure("illegal character")
- )
-
- override def identChar = letter | elem('.') | elem('_')
-
- override def whitespace: Parser[Any] = rep(
- whitespaceChar
- | '/' ~ '*' ~ comment
- | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
- | '#' ~ rep( chrExcept(EofCh, '\n') )
- | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
- | '/' ~ '*' ~ failure("unclosed comment")
- )
- }
-
- override val lexical = new SqlLexical
+ lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
@@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
this.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword])
-
- /** Generate all variations of upper and lower case of a given string */
- private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
- if (s == "") {
- Stream(prefix)
- } else {
- allCaseVersions(s.tail, prefix + s.head.toLower) ++
- allCaseVersions(s.tail, prefix + s.head.toUpper)
- }
- }
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
- lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))
-
- lexical.delimiters += (
- "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
- ",", ";", "%", "{", "}", ":", "[", "]"
- )
+ override val lexical = new SqlLexical(reservedWords)
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
@@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
protected lazy val baseExpression: PackratParser[Expression] =
- expression ~ "[" ~ expression <~ "]" ^^ {
+ expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
TRUE ^^^ Literal(true, BooleanType) |
@@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType
}
+
+class SqlLexical(val keywords: Seq[String]) extends StdLexical {
+ case class FloatLit(chars: String) extends Token {
+ override def toString = chars
+ }
+
+ reserved ++= keywords.flatMap(w => allCaseVersions(w))
+
+ delimiters += (
+ "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+ ",", ";", "%", "{", "}", ":", "[", "]"
+ )
+
+ override lazy val token: Parser[Token] = (
+ identChar ~ rep( identChar | digit ) ^^
+ { case first ~ rest => processIdent(first :: rest mkString "") }
+ | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
+ case i ~ None => NumericLit(i mkString "")
+ case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
+ }
+ | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
+ { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
+ | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
+ { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
+ | EofCh ^^^ EOF
+ | '\'' ~> failure("unclosed string literal")
+ | '\"' ~> failure("unclosed string literal")
+ | delim
+ | failure("illegal character")
+ )
+
+ override def identChar = letter | elem('_') | elem('.')
+
+ override def whitespace: Parser[Any] = rep(
+ whitespaceChar
+ | '/' ~ '*' ~ comment
+ | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
+ | '#' ~ rep( chrExcept(EofCh, '\n') )
+ | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
+ | '/' ~ '*' ~ failure("unclosed comment")
+ )
+
+ /** Generate all variations of upper and lower case of a given string */
+ def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
+ if (s == "") {
+ Stream(prefix)
+ } else {
+ allCaseVersions(s.tail, prefix + s.head.toLower) ++
+ allCaseVersions(s.tail, prefix + s.head.toUpper)
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index b6aeae92f8..5d3bb25ad5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
null
} else {
if (child.dataType.isInstanceOf[ArrayType]) {
+ // TODO: consider using Array[_] for ArrayType child to avoid
+ // boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val o = key.asInstanceOf[Int]
if (o >= baseValue.size || o < 0) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index da34bd3a21..bb77bccf86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,9 +19,71 @@ package org.apache.spark.sql.catalyst.types
import java.sql.Timestamp
-import scala.reflect.runtime.universe.{typeTag, TypeTag}
+import scala.util.parsing.combinator.RegexParsers
-import org.apache.spark.sql.catalyst.expressions.Expression
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.util.Utils
+
+/**
+ *
+ */
+object DataType extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ "StringType" ^^^ StringType |
+ "FloatType" ^^^ FloatType |
+ "IntegerType" ^^^ IntegerType |
+ "ByteType" ^^^ ByteType |
+ "ShortType" ^^^ ShortType |
+ "DoubleType" ^^^ DoubleType |
+ "LongType" ^^^ LongType |
+ "BinaryType" ^^^ BinaryType |
+ "BooleanType" ^^^ BooleanType |
+ "DecimalType" ^^^ DecimalType |
+ "TimestampType" ^^^ TimestampType
+
+ protected lazy val arrayType: Parser[DataType] =
+ "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType
+
+ protected lazy val mapType: Parser[DataType] =
+ "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
+ case t1 ~ _ ~ t2 => MapType(t1, t2)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+ case name ~ tpe ~ nullable =>
+ StructField(name, tpe, nullable = nullable)
+ }
+
+ protected lazy val boolVal: Parser[Boolean] =
+ "true" ^^^ true |
+ "false" ^^^ false
+
+
+ protected lazy val structType: Parser[DataType] =
+ "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+ case fields => new StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ arrayType |
+ mapType |
+ structType |
+ primitiveType
+
+ /**
+ * Parses a string representation of a DataType.
+ *
+ * TODO: Generate parser as pickler...
+ */
+ def apply(asString: String): DataType = parseAll(dataType, asString) match {
+ case Success(result, _) => result
+ case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
+ }
+}
abstract class DataType {
/** Matches any expression that evaluates to this DataType */
@@ -29,25 +91,36 @@ abstract class DataType {
case e: Expression if e.dataType == this => true
case _ => false
}
+
+ def isPrimitive: Boolean = false
}
case object NullType extends DataType
+trait PrimitiveType extends DataType {
+ override def isPrimitive = true
+}
+
abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
+
+ @transient val classTag = {
+ val mirror = runtimeMirror(Utils.getSparkClassLoader)
+ ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
+ }
}
-case object StringType extends NativeType {
+case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
-case object BinaryType extends DataType {
+case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
-case object BooleanType extends NativeType {
+case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
@@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
}
}
-abstract class NumericType extends NativeType {
+abstract class NumericType extends NativeType with PrimitiveType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
@@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
case class ArrayType(elementType: DataType) extends DataType
case class StructField(name: String, dataType: DataType, nullable: Boolean)
-case class StructType(fields: Seq[StructField]) extends DataType
+
+object StructType {
+ def fromAttributes(attributes: Seq[Attribute]): StructType = {
+ StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
+ }
+
+ // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
+}
+
+case class StructType(fields: Seq[StructField]) extends DataType {
+ def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+}
case class MapType(keyType: DataType, valueType: DataType) extends DataType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1617ec717b..ab376e5504 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
- new SchemaRDD(this, parquet.ParquetRelation(path))
+ new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))
/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index ff9842267f..ff6deeda23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
- new JavaSchemaRDD(sqlContext, ParquetRelation(path))
+ new JavaSchemaRDD(
+ sqlContext,
+ ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))
/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index feb280d1d1..4694f25d6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
- InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
+ // Note: overwrite=false because otherwise the metadata we just created will be deleted
+ InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
new file mode 100644
index 0000000000..889a408e3c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+
+import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
+import parquet.schema.MessageType
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
+import org.apache.spark.sql.parquet.CatalystConverter.FieldType
+
+/**
+ * Collection of converters of Parquet types (group and primitive types) that
+ * model arrays and maps. The conversions are partly based on the AvroParquet
+ * converters that are part of Parquet in order to be able to process these
+ * types.
+ *
+ * There are several types of converters:
+ * <ul>
+ * <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for primitive
+ * (numeric, boolean and String) types</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for arrays
+ * of native JVM element types; note: currently null values are not supported!</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystArrayConverter]] for arrays of
+ * arbitrary element types (including nested element types); note: currently
+ * null values are not supported!</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystStructConverter]] for structs</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; note:
+ * currently null values are not supported!</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] for rows
+ * of only primitive element types</li>
+ * <li>[[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other nested
+ * records, including the top-level row record</li>
+ * </ul>
+ */
+
+private[sql] object CatalystConverter {
+ // The type internally used for fields
+ type FieldType = StructField
+
+ // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
+ // Note that "array" for the array elements is chosen by ParquetAvro.
+ // Using a different value will result in Parquet silently dropping columns.
+ val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+ val MAP_KEY_SCHEMA_NAME = "key"
+ val MAP_VALUE_SCHEMA_NAME = "value"
+ val MAP_SCHEMA_NAME = "map"
+
+ // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
+ type ArrayScalaType[T] = Seq[T]
+ type StructScalaType[T] = Seq[T]
+ type MapScalaType[K, V] = Map[K, V]
+
+ protected[parquet] def createConverter(
+ field: FieldType,
+ fieldIndex: Int,
+ parent: CatalystConverter): Converter = {
+ val fieldType: DataType = field.dataType
+ fieldType match {
+ // For native JVM types we use a converter with native arrays
+ case ArrayType(elementType: NativeType) => {
+ new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
+ }
+ // This is for other types of arrays, including those with nested fields
+ case ArrayType(elementType: DataType) => {
+ new CatalystArrayConverter(elementType, fieldIndex, parent)
+ }
+ case StructType(fields: Seq[StructField]) => {
+ new CatalystStructConverter(fields.toArray, fieldIndex, parent)
+ }
+ case MapType(keyType: DataType, valueType: DataType) => {
+ new CatalystMapConverter(
+ Array(
+ new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
+ new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
+ fieldIndex,
+ parent)
+ }
+ // Strings, Shorts and Bytes do not have a corresponding type in Parquet
+ // so we need to treat them separately
+ case StringType => {
+ new CatalystPrimitiveConverter(parent, fieldIndex) {
+ override def addBinary(value: Binary): Unit =
+ parent.updateString(fieldIndex, value)
+ }
+ }
+ case ShortType => {
+ new CatalystPrimitiveConverter(parent, fieldIndex) {
+ override def addInt(value: Int): Unit =
+ parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.JvmType])
+ }
+ }
+ case ByteType => {
+ new CatalystPrimitiveConverter(parent, fieldIndex) {
+ override def addInt(value: Int): Unit =
+ parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
+ }
+ }
+ // All other primitive types use the default converter
+ case ctype: NativeType => { // note: need the type tag here!
+ new CatalystPrimitiveConverter(parent, fieldIndex)
+ }
+ case _ => throw new RuntimeException(
+ s"unable to convert datatype ${field.dataType.toString} in CatalystConverter")
+ }
+ }
+
+ protected[parquet] def createRootConverter(
+ parquetSchema: MessageType,
+ attributes: Seq[Attribute]): CatalystConverter = {
+ // For non-nested types we use the optimized Row converter
+ if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
+ new CatalystPrimitiveRowConverter(attributes.toArray)
+ } else {
+ new CatalystGroupConverter(attributes.toArray)
+ }
+ }
+}
+
+private[parquet] abstract class CatalystConverter extends GroupConverter {
+ /**
+ * The number of fields this group has
+ */
+ protected[parquet] val size: Int
+
+ /**
+ * The index of this converter in the parent
+ */
+ protected[parquet] val index: Int
+
+ /**
+ * The parent converter
+ */
+ protected[parquet] val parent: CatalystConverter
+
+ /**
+ * Called by child converters to update their value in its parent (this).
+ * Note that if possible the more specific update methods below should be used
+ * to avoid auto-boxing of native JVM types.
+ *
+ * @param fieldIndex
+ * @param value
+ */
+ protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
+
+ protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
+ updateField(fieldIndex, value)
+
+ protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
+ updateField(fieldIndex, value.getBytes)
+
+ protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
+ updateField(fieldIndex, value.toStringUsingUTF8)
+
+ protected[parquet] def isRootConverter: Boolean = parent == null
+
+ protected[parquet] def clearBuffer(): Unit
+
+ /**
+ * Should only be called in the root (group) converter!
+ *
+ * @return
+ */
+ def getCurrentRecord: Row = throw new UnsupportedOperationException
+}
+
+/**
+ * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
+ * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object.
+ *
+ * @param schema The corresponding Catalyst schema in the form of a list of attributes.
+ */
+private[parquet] class CatalystGroupConverter(
+ protected[parquet] val schema: Array[FieldType],
+ protected[parquet] val index: Int,
+ protected[parquet] val parent: CatalystConverter,
+ protected[parquet] var current: ArrayBuffer[Any],
+ protected[parquet] var buffer: ArrayBuffer[Row])
+ extends CatalystConverter {
+
+ def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
+ this(
+ schema,
+ index,
+ parent,
+ current=null,
+ buffer=new ArrayBuffer[Row](
+ CatalystArrayConverter.INITIAL_ARRAY_SIZE))
+
+ /**
+ * This constructor is used for the root converter only!
+ */
+ def this(attributes: Array[Attribute]) =
+ this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
+
+ protected [parquet] val converters: Array[Converter] =
+ schema.map(field =>
+ CatalystConverter.createConverter(field, schema.indexOf(field), this))
+ .toArray
+
+ override val size = schema.size
+
+ override def getCurrentRecord: Row = {
+ assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
+ // TODO: use iterators if possible
+ // Note: this will ever only be called in the root converter when the record has been
+ // fully processed. Therefore it will be difficult to use mutable rows instead, since
+ // any non-root converter never would be sure when it would be safe to re-use the buffer.
+ new GenericRow(current.toArray)
+ }
+
+ override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
+
+ // for child converters to update upstream values
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ current.update(fieldIndex, value)
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = buffer.clear()
+
+ override def start(): Unit = {
+ current = ArrayBuffer.fill(size)(null)
+ converters.foreach {
+ converter => if (!converter.isPrimitive) {
+ converter.asInstanceOf[CatalystConverter].clearBuffer
+ }
+ }
+ }
+
+ override def end(): Unit = {
+ if (!isRootConverter) {
+ assert(current!=null) // there should be no empty groups
+ buffer.append(new GenericRow(current.toArray))
+ parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]]))
+ }
+ }
+}
+
+/**
+ * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
+ * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his
+ * converter is optimized for rows of primitive types (non-nested records).
+ */
+private[parquet] class CatalystPrimitiveRowConverter(
+ protected[parquet] val schema: Array[FieldType],
+ protected[parquet] var current: ParquetRelation.RowType)
+ extends CatalystConverter {
+
+ // This constructor is used for the root converter only
+ def this(attributes: Array[Attribute]) =
+ this(
+ attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
+ new ParquetRelation.RowType(attributes.length))
+
+ protected [parquet] val converters: Array[Converter] =
+ schema.map(field =>
+ CatalystConverter.createConverter(field, schema.indexOf(field), this))
+ .toArray
+
+ override val size = schema.size
+
+ override val index = 0
+
+ override val parent = null
+
+ // Should be only called in root group converter!
+ override def getCurrentRecord: ParquetRelation.RowType = current
+
+ override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
+
+ // for child converters to update upstream values
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ throw new UnsupportedOperationException // child converters should use the
+ // specific update methods below
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {}
+
+ override def start(): Unit = {
+ var i = 0
+ while (i < size) {
+ current.setNullAt(i)
+ i = i + 1
+ }
+ }
+
+ override def end(): Unit = {}
+
+ // Overriden here to avoid auto-boxing for primitive types
+ override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
+ current.setBoolean(fieldIndex, value)
+
+ override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
+ current.setInt(fieldIndex, value)
+
+ override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
+ current.setLong(fieldIndex, value)
+
+ override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
+ current.setShort(fieldIndex, value)
+
+ override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
+ current.setByte(fieldIndex, value)
+
+ override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
+ current.setDouble(fieldIndex, value)
+
+ override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
+ current.setFloat(fieldIndex, value)
+
+ override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
+ current.update(fieldIndex, value.getBytes)
+
+ override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
+ current.setString(fieldIndex, value.toStringUsingUTF8)
+}
+
+/**
+ * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
+ *
+ * @param parent The parent group converter.
+ * @param fieldIndex The index inside the record.
+ */
+private[parquet] class CatalystPrimitiveConverter(
+ parent: CatalystConverter,
+ fieldIndex: Int) extends PrimitiveConverter {
+ override def addBinary(value: Binary): Unit =
+ parent.updateBinary(fieldIndex, value)
+
+ override def addBoolean(value: Boolean): Unit =
+ parent.updateBoolean(fieldIndex, value)
+
+ override def addDouble(value: Double): Unit =
+ parent.updateDouble(fieldIndex, value)
+
+ override def addFloat(value: Float): Unit =
+ parent.updateFloat(fieldIndex, value)
+
+ override def addInt(value: Int): Unit =
+ parent.updateInt(fieldIndex, value)
+
+ override def addLong(value: Long): Unit =
+ parent.updateLong(fieldIndex, value)
+}
+
+object CatalystArrayConverter {
+ val INITIAL_ARRAY_SIZE = 20
+}
+
+/**
+ * A `parquet.io.api.GroupConverter` that converts a single-element groups that
+ * match the characteristics of an array (see
+ * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
+ * [[org.apache.spark.sql.catalyst.types.ArrayType]].
+ *
+ * @param elementType The type of the array elements (complex or primitive)
+ * @param index The position of this (array) field inside its parent converter
+ * @param parent The parent converter
+ * @param buffer A data buffer
+ */
+private[parquet] class CatalystArrayConverter(
+ val elementType: DataType,
+ val index: Int,
+ protected[parquet] val parent: CatalystConverter,
+ protected[parquet] var buffer: Buffer[Any])
+ extends CatalystConverter {
+
+ def this(elementType: DataType, index: Int, parent: CatalystConverter) =
+ this(
+ elementType,
+ index,
+ parent,
+ new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
+
+ protected[parquet] val converter: Converter = CatalystConverter.createConverter(
+ new CatalystConverter.FieldType(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ elementType,
+ false),
+ fieldIndex=0,
+ parent=this)
+
+ override def getConverter(fieldIndex: Int): Converter = converter
+
+ // arrays have only one (repeated) field, which is its elements
+ override val size = 1
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ // fieldIndex is ignored (assumed to be zero but not checked)
+ if(value == null) {
+ throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!")
+ }
+ buffer += value
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {
+ buffer.clear()
+ }
+
+ override def start(): Unit = {
+ if (!converter.isPrimitive) {
+ converter.asInstanceOf[CatalystConverter].clearBuffer
+ }
+ }
+
+ override def end(): Unit = {
+ assert(parent != null)
+ // here we need to make sure to use ArrayScalaType
+ parent.updateField(index, buffer.toArray.toSeq)
+ clearBuffer()
+ }
+}
+
+/**
+ * A `parquet.io.api.GroupConverter` that converts a single-element groups that
+ * match the characteristics of an array (see
+ * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
+ * [[org.apache.spark.sql.catalyst.types.ArrayType]].
+ *
+ * @param elementType The type of the array elements (native)
+ * @param index The position of this (array) field inside its parent converter
+ * @param parent The parent converter
+ * @param capacity The (initial) capacity of the buffer
+ */
+private[parquet] class CatalystNativeArrayConverter(
+ val elementType: NativeType,
+ val index: Int,
+ protected[parquet] val parent: CatalystConverter,
+ protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
+ extends CatalystConverter {
+
+ type NativeType = elementType.JvmType
+
+ private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity)
+
+ private var elements: Int = 0
+
+ protected[parquet] val converter: Converter = CatalystConverter.createConverter(
+ new CatalystConverter.FieldType(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ elementType,
+ false),
+ fieldIndex=0,
+ parent=this)
+
+ override def getConverter(fieldIndex: Int): Converter = converter
+
+ // arrays have only one (repeated) field, which is its elements
+ override val size = 1
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
+ throw new UnsupportedOperationException
+
+ // Overriden here to avoid auto-boxing for primitive types
+ override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.getBytes.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
+ checkGrowBuffer()
+ buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType]
+ elements += 1
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {
+ elements = 0
+ }
+
+ override def start(): Unit = {}
+
+ override def end(): Unit = {
+ assert(parent != null)
+ // here we need to make sure to use ArrayScalaType
+ parent.updateField(
+ index,
+ buffer.slice(0, elements).toSeq)
+ clearBuffer()
+ }
+
+ private def checkGrowBuffer(): Unit = {
+ if (elements >= capacity) {
+ val newCapacity = 2 * capacity
+ val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity)
+ Array.copy(buffer, 0, tmp, 0, capacity)
+ buffer = tmp
+ capacity = newCapacity
+ }
+ }
+}
+
+/**
+ * This converter is for multi-element groups of primitive or complex types
+ * that have repetition level optional or required (so struct fields).
+ *
+ * @param schema The corresponding Catalyst schema in the form of a list of
+ * attributes.
+ * @param index
+ * @param parent
+ */
+private[parquet] class CatalystStructConverter(
+ override protected[parquet] val schema: Array[FieldType],
+ override protected[parquet] val index: Int,
+ override protected[parquet] val parent: CatalystConverter)
+ extends CatalystGroupConverter(schema, index, parent) {
+
+ override protected[parquet] def clearBuffer(): Unit = {}
+
+ // TODO: think about reusing the buffer
+ override def end(): Unit = {
+ assert(!isRootConverter)
+ // here we need to make sure to use StructScalaType
+ // Note: we need to actually make a copy of the array since we
+ // may be in a nested field
+ parent.updateField(index, new GenericRow(current.toArray))
+ }
+}
+
+/**
+ * A `parquet.io.api.GroupConverter` that converts two-element groups that
+ * match the characteristics of a map (see
+ * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
+ * [[org.apache.spark.sql.catalyst.types.MapType]].
+ *
+ * @param schema
+ * @param index
+ * @param parent
+ */
+private[parquet] class CatalystMapConverter(
+ protected[parquet] val schema: Array[FieldType],
+ override protected[parquet] val index: Int,
+ override protected[parquet] val parent: CatalystConverter)
+ extends CatalystConverter {
+
+ private val map = new HashMap[Any, Any]()
+
+ private val keyValueConverter = new CatalystConverter {
+ private var currentKey: Any = null
+ private var currentValue: Any = null
+ val keyConverter = CatalystConverter.createConverter(schema(0), 0, this)
+ val valueConverter = CatalystConverter.createConverter(schema(1), 1, this)
+
+ override def getConverter(fieldIndex: Int): Converter = {
+ if (fieldIndex == 0) keyConverter else valueConverter
+ }
+
+ override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue
+
+ override def start(): Unit = {
+ currentKey = null
+ currentValue = null
+ }
+
+ override protected[parquet] val size: Int = 2
+ override protected[parquet] val index: Int = 0
+ override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
+ fieldIndex match {
+ case 0 =>
+ currentKey = value
+ case 1 =>
+ currentValue = value
+ case _ =>
+ new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex")
+ }
+ }
+
+ override protected[parquet] def clearBuffer(): Unit = {}
+ }
+
+ override protected[parquet] val size: Int = 1
+
+ override protected[parquet] def clearBuffer(): Unit = {}
+
+ override def start(): Unit = {
+ map.clear()
+ }
+
+ override def end(): Unit = {
+ // here we need to make sure to use MapScalaType
+ parent.updateField(index, map.toMap)
+ }
+
+ override def getConverter(fieldIndex: Int): Converter = keyValueConverter
+
+ override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
+ throw new UnsupportedOperationException
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 32813a66de..96c131a7f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -20,25 +20,16 @@ package org.apache.spark.sql.parquet
import java.io.IOException
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.mapreduce.Job
-import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader}
-import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
-import parquet.io.api.{Binary, RecordConsumer}
-import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser}
-import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
-import parquet.schema.Type.Repetition
+import parquet.hadoop.ParquetOutputFormat
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.schema.MessageType
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
-import org.apache.spark.sql.catalyst.types._
-
-// Implicits
-import scala.collection.JavaConversions._
/**
* Relation that consists of data stored in a Parquet columnar format.
@@ -52,21 +43,20 @@ import scala.collection.JavaConversions._
*
* @param path The path to the Parquet file.
*/
-private[sql] case class ParquetRelation(val path: String)
- extends LeafNode with MultiInstanceRelation {
+private[sql] case class ParquetRelation(
+ val path: String,
+ @transient val conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
self: Product =>
/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
- .readMetaData(new Path(path))
+ .readMetaData(new Path(path), conf)
.getFileMetaData
.getSchema
/** Attributes */
- override val output =
- ParquetTypesConverter
- .convertToAttributes(parquetSchema)
+ override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
override def newInstance = ParquetRelation(path).asInstanceOf[this.type]
@@ -141,7 +131,9 @@ private[sql] object ParquetRelation {
}
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
- new ParquetRelation(path.toString)
+ new ParquetRelation(path.toString, Some(conf)) {
+ override val output = attributes
+ }
}
private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
@@ -170,151 +162,3 @@ private[sql] object ParquetRelation {
path
}
}
-
-private[parquet] object ParquetTypesConverter {
- def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
- // for now map binary to string type
- // TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
- case ParquetPrimitiveTypeName.BINARY => StringType
- case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
- case ParquetPrimitiveTypeName.DOUBLE => DoubleType
- case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType)
- case ParquetPrimitiveTypeName.FLOAT => FloatType
- case ParquetPrimitiveTypeName.INT32 => IntegerType
- case ParquetPrimitiveTypeName.INT64 => LongType
- case ParquetPrimitiveTypeName.INT96 =>
- // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
- sys.error("Warning: potential loss of precision: converting INT96 to long")
- LongType
- case _ => sys.error(
- s"Unsupported parquet datatype $parquetType")
- }
-
- def fromDataType(ctype: DataType): ParquetPrimitiveTypeName = ctype match {
- case StringType => ParquetPrimitiveTypeName.BINARY
- case BooleanType => ParquetPrimitiveTypeName.BOOLEAN
- case DoubleType => ParquetPrimitiveTypeName.DOUBLE
- case ArrayType(ByteType) => ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- case FloatType => ParquetPrimitiveTypeName.FLOAT
- case IntegerType => ParquetPrimitiveTypeName.INT32
- case LongType => ParquetPrimitiveTypeName.INT64
- case _ => sys.error(s"Unsupported datatype $ctype")
- }
-
- def consumeType(consumer: RecordConsumer, ctype: DataType, record: Row, index: Int): Unit = {
- ctype match {
- case StringType => consumer.addBinary(
- Binary.fromByteArray(
- record(index).asInstanceOf[String].getBytes("utf-8")
- )
- )
- case IntegerType => consumer.addInteger(record.getInt(index))
- case LongType => consumer.addLong(record.getLong(index))
- case DoubleType => consumer.addDouble(record.getDouble(index))
- case FloatType => consumer.addFloat(record.getFloat(index))
- case BooleanType => consumer.addBoolean(record.getBoolean(index))
- case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
- }
- }
-
- def getSchema(schemaString : String) : MessageType =
- MessageTypeParser.parseMessageType(schemaString)
-
- def convertToAttributes(parquetSchema: MessageType) : Seq[Attribute] = {
- parquetSchema.getColumns.map {
- case (desc) =>
- val ctype = toDataType(desc.getType)
- val name: String = desc.getPath.mkString(".")
- new AttributeReference(name, ctype, false)()
- }
- }
-
- // TODO: allow nesting?
- def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
- val fields: Seq[ParquetType] = attributes.map {
- a => new ParquetPrimitiveType(Repetition.OPTIONAL, fromDataType(a.dataType), a.name)
- }
- new MessageType("root", fields)
- }
-
- def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration) {
- if (origPath == null) {
- throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
- }
- val fs = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(
- s"Unable to write Parquet metadata: path $origPath is incorrectly formatted")
- }
- val path = origPath.makeQualified(fs)
- if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
- throw new IllegalArgumentException(s"Expected to write to directory $path but found file")
- }
- val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fs.exists(metadataPath)) {
- try {
- fs.delete(metadataPath, true)
- } catch {
- case e: IOException =>
- throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath")
- }
- }
- val extraMetadata = new java.util.HashMap[String, String]()
- extraMetadata.put("path", path.toString)
- // TODO: add extra data, e.g., table name, date, etc.?
-
- val parquetSchema: MessageType =
- ParquetTypesConverter.convertFromAttributes(attributes)
- val metaData: FileMetaData = new FileMetaData(
- parquetSchema,
- extraMetadata,
- "Spark")
-
- ParquetRelation.enableLogForwarding()
- ParquetFileWriter.writeMetadataFile(
- conf,
- path,
- new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
- }
-
- /**
- * Try to read Parquet metadata at the given Path. We first see if there is a summary file
- * in the parent directory. If so, this is used. Else we read the actual footer at the given
- * location.
- * @param origPath The path at which we expect one (or more) Parquet files.
- * @return The `ParquetMetadata` containing among other things the schema.
- */
- def readMetaData(origPath: Path): ParquetMetadata = {
- if (origPath == null) {
- throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
- }
- val job = new Job()
- // TODO: since this is called from ParquetRelation (LogicalPlan) we don't have access
- // to SparkContext's hadoopConfig; in principle the default FileSystem may be different(?!)
- val conf = ContextUtil.getConfiguration(job)
- val fs: FileSystem = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
- }
- val path = origPath.makeQualified(fs)
- if (!fs.getFileStatus(path).isDir) {
- throw new IllegalArgumentException(
- s"Expected $path for be a directory with Parquet files/metadata")
- }
- ParquetRelation.enableLogForwarding()
- val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
- // if this is a new table that was just created we will find only the metadata file
- if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
- ParquetFileReader.readFooter(conf, metadataPath)
- } else {
- // there may be one or more Parquet files in the given directory
- val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
- // TODO: for now we assume that all footers (if there is more than one) have identical
- // metadata; we may want to add a check here at some point
- if (footers.size() == 0) {
- throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
- }
- footers(0).getParquetMetadata
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65ba1246fb..624f2e2fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -36,6 +36,7 @@ import parquet.schema.MessageType
import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
+import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
/**
@@ -64,10 +65,13 @@ case class ParquetTableScan(
NewFileInputFormat.addInputPath(job, path)
}
- // Store Parquet schema in `Configuration`
+ // Store both requested and original schema in `Configuration`
conf.set(
- RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
- ParquetTypesConverter.convertFromAttributes(output).toString)
+ RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ ParquetTypesConverter.convertToString(output))
+ conf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ ParquetTypesConverter.convertToString(relation.output))
// Store record filtering predicate in `Configuration`
// Note 1: the input format ignores all predicates that cannot be expressed
@@ -166,13 +170,18 @@ case class InsertIntoParquetTable(
val job = new Job(sc.hadoopConfiguration)
- ParquetOutputFormat.setWriteSupportClass(
- job,
- classOf[org.apache.spark.sql.parquet.RowWriteSupport])
+ val writeSupport =
+ if (child.output.map(_.dataType).forall(_.isPrimitive)) {
+ logger.debug("Initializing MutableRowWriteSupport")
+ classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
+ } else {
+ classOf[org.apache.spark.sql.parquet.RowWriteSupport]
+ }
+
+ ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
- // TODO: move that to function in object
val conf = ContextUtil.getConfiguration(job)
- conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, relation.parquetSchema.toString)
+ RowWriteSupport.setSchema(relation.output, conf)
val fspath = new Path(relation.path)
val fs = fspath.getFileSystem(conf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 71ba0fecce..bfcbdeb34a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -29,21 +29,23 @@ import parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+import com.google.common.io.BaseEncoding
/**
* A `parquet.io.api.RecordMaterializer` for Rows.
*
*@param root The root group converter for the record.
*/
-private[parquet] class RowRecordMaterializer(root: CatalystGroupConverter)
+private[parquet] class RowRecordMaterializer(root: CatalystConverter)
extends RecordMaterializer[Row] {
- def this(parquetSchema: MessageType) =
- this(new CatalystGroupConverter(ParquetTypesConverter.convertToAttributes(parquetSchema)))
+ def this(parquetSchema: MessageType, attributes: Seq[Attribute]) =
+ this(CatalystConverter.createRootConverter(parquetSchema, attributes))
override def getCurrentRecord: Row = root.getCurrentRecord
- override def getRootConverter: GroupConverter = root
+ override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter]
}
/**
@@ -56,68 +58,94 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
stringMap: java.util.Map[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[Row] = {
- log.debug(s"preparing for read with file schema $fileSchema")
- new RowRecordMaterializer(readContext.getRequestedSchema)
+ log.debug(s"preparing for read with Parquet file schema $fileSchema")
+ // Note: this very much imitates AvroParquet
+ val parquetSchema = readContext.getRequestedSchema
+ var schema: Seq[Attribute] = null
+
+ if (readContext.getReadSupportMetadata != null) {
+ // first try to find the read schema inside the metadata (can result from projections)
+ if (
+ readContext
+ .getReadSupportMetadata
+ .get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA) != null) {
+ schema = ParquetTypesConverter.convertFromString(
+ readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
+ } else {
+ // if unavailable, try the schema that was read originally from the file or provided
+ // during the creation of the Parquet relation
+ if (readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
+ schema = ParquetTypesConverter.convertFromString(
+ readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
+ }
+ }
+ }
+ // if both unavailable, fall back to deducing the schema from the given Parquet schema
+ if (schema == null) {
+ log.debug("falling back to Parquet read schema")
+ schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
+ }
+ log.debug(s"list of attributes that will be read: $schema")
+ new RowRecordMaterializer(parquetSchema, schema)
}
override def init(
configuration: Configuration,
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType): ReadContext = {
- val requested_schema_string =
- configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString)
- val requested_schema =
- MessageTypeParser.parseMessageType(requested_schema_string)
- log.debug(s"read support initialized for requested schema $requested_schema")
- ParquetRelation.enableLogForwarding()
- new ReadContext(requested_schema, keyValueMetaData)
+ var parquetSchema: MessageType = fileSchema
+ var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]()
+ val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
+
+ if (requestedAttributes != null) {
+ parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
+ metadata.put(
+ RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ ParquetTypesConverter.convertToString(requestedAttributes))
+ }
+
+ val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
+ if (origAttributesStr != null) {
+ metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
+ }
+
+ return new ReadSupport.ReadContext(parquetSchema, metadata)
}
}
private[parquet] object RowReadSupport {
- val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+ val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+ val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+ private def getRequestedSchema(configuration: Configuration): Seq[Attribute] = {
+ val schemaString = configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+ if (schemaString == null) null else ParquetTypesConverter.convertFromString(schemaString)
+ }
}
/**
* A `parquet.hadoop.api.WriteSupport` for Row ojects.
*/
private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
- def setSchema(schema: MessageType, configuration: Configuration) {
- // for testing
- this.schema = schema
- // TODO: could use Attributes themselves instead of Parquet schema?
- configuration.set(
- RowWriteSupport.PARQUET_ROW_SCHEMA,
- schema.toString)
- configuration.set(
- ParquetOutputFormat.WRITER_VERSION,
- ParquetProperties.WriterVersion.PARQUET_1_0.toString)
- }
-
- def getSchema(configuration: Configuration): MessageType = {
- MessageTypeParser.parseMessageType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
- }
- private var schema: MessageType = null
- private var writer: RecordConsumer = null
- private var attributes: Seq[Attribute] = null
+ private[parquet] var writer: RecordConsumer = null
+ private[parquet] var attributes: Seq[Attribute] = null
override def init(configuration: Configuration): WriteSupport.WriteContext = {
- schema = if (schema == null) getSchema(configuration) else schema
- attributes = ParquetTypesConverter.convertToAttributes(schema)
- log.debug(s"write support initialized for requested schema $schema")
+ attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes
+
+ log.debug(s"write support initialized for requested schema $attributes")
ParquetRelation.enableLogForwarding()
new WriteSupport.WriteContext(
- schema,
+ ParquetTypesConverter.convertFromAttributes(attributes),
new java.util.HashMap[java.lang.String, java.lang.String]())
}
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
writer = recordConsumer
- log.debug(s"preparing for write with schema $schema")
+ log.debug(s"preparing for write with schema $attributes")
}
- // TODO: add groups (nested fields)
override def write(record: Row): Unit = {
if (attributes.size > record.size) {
throw new IndexOutOfBoundsException(
@@ -130,98 +158,176 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
// null values indicate optional fields but we do not check currently
if (record(index) != null && record(index) != Nil) {
writer.startField(attributes(index).name, index)
- ParquetTypesConverter.consumeType(writer, attributes(index).dataType, record, index)
+ writeValue(attributes(index).dataType, record(index))
writer.endField(attributes(index).name, index)
}
index = index + 1
}
writer.endMessage()
}
-}
-private[parquet] object RowWriteSupport {
- val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema"
-}
-
-/**
- * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record to a `Row` object.
- *
- * @param schema The corresponding Catalyst schema in the form of a list of attributes.
- */
-private[parquet] class CatalystGroupConverter(
- schema: Seq[Attribute],
- protected[parquet] val current: ParquetRelation.RowType) extends GroupConverter {
-
- def this(schema: Seq[Attribute]) = this(schema, new ParquetRelation.RowType(schema.length))
-
- val converters: Array[Converter] = schema.map {
- a => a.dataType match {
- case ctype: NativeType =>
- // note: for some reason matching for StringType fails so use this ugly if instead
- if (ctype == StringType) {
- new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
- } else {
- new CatalystPrimitiveConverter(this, schema.indexOf(a))
- }
- case _ => throw new RuntimeException(
- s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter")
+ private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
+ if (value != null && value != Nil) {
+ schema match {
+ case t @ ArrayType(_) => writeArray(
+ t,
+ value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
+ case t @ MapType(_, _) => writeMap(
+ t,
+ value.asInstanceOf[CatalystConverter.MapScalaType[_, _]])
+ case t @ StructType(_) => writeStruct(
+ t,
+ value.asInstanceOf[CatalystConverter.StructScalaType[_]])
+ case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
+ }
}
- }.toArray
+ }
- override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
+ private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
+ if (value != null && value != Nil) {
+ schema match {
+ case StringType => writer.addBinary(
+ Binary.fromByteArray(
+ value.asInstanceOf[String].getBytes("utf-8")
+ )
+ )
+ case IntegerType => writer.addInteger(value.asInstanceOf[Int])
+ case ShortType => writer.addInteger(value.asInstanceOf[Int])
+ case LongType => writer.addLong(value.asInstanceOf[Long])
+ case ByteType => writer.addInteger(value.asInstanceOf[Int])
+ case DoubleType => writer.addDouble(value.asInstanceOf[Double])
+ case FloatType => writer.addFloat(value.asInstanceOf[Float])
+ case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
+ case _ => sys.error(s"Do not know how to writer $schema to consumer")
+ }
+ }
+ }
- private[parquet] def getCurrentRecord: ParquetRelation.RowType = current
+ private[parquet] def writeStruct(
+ schema: StructType,
+ struct: CatalystConverter.StructScalaType[_]): Unit = {
+ if (struct != null && struct != Nil) {
+ val fields = schema.fields.toArray
+ writer.startGroup()
+ var i = 0
+ while(i < fields.size) {
+ if (struct(i) != null && struct(i) != Nil) {
+ writer.startField(fields(i).name, i)
+ writeValue(fields(i).dataType, struct(i))
+ writer.endField(fields(i).name, i)
+ }
+ i = i + 1
+ }
+ writer.endGroup()
+ }
+ }
- override def start(): Unit = {
- var i = 0
- while (i < schema.length) {
- current.setNullAt(i)
- i = i + 1
+ // TODO: support null values, see
+ // https://issues.apache.org/jira/browse/SPARK-1649
+ private[parquet] def writeArray(
+ schema: ArrayType,
+ array: CatalystConverter.ArrayScalaType[_]): Unit = {
+ val elementType = schema.elementType
+ writer.startGroup()
+ if (array.size > 0) {
+ writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+ var i = 0
+ while(i < array.size) {
+ writeValue(elementType, array(i))
+ i = i + 1
+ }
+ writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
}
+ writer.endGroup()
}
- override def end(): Unit = {}
+ // TODO: support null values, see
+ // https://issues.apache.org/jira/browse/SPARK-1649
+ private[parquet] def writeMap(
+ schema: MapType,
+ map: CatalystConverter.MapScalaType[_, _]): Unit = {
+ writer.startGroup()
+ if (map.size > 0) {
+ writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
+ writer.startGroup()
+ writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
+ for(key <- map.keys) {
+ writeValue(schema.keyType, key)
+ }
+ writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
+ writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+ for(value <- map.values) {
+ writeValue(schema.valueType, value)
+ }
+ writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+ writer.endGroup()
+ writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
+ }
+ writer.endGroup()
+ }
}
-/**
- * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
- *
- * @param parent The parent group converter.
- * @param fieldIndex The index inside the record.
- */
-private[parquet] class CatalystPrimitiveConverter(
- parent: CatalystGroupConverter,
- fieldIndex: Int) extends PrimitiveConverter {
- // TODO: consider refactoring these together with ParquetTypesConverter
- override def addBinary(value: Binary): Unit =
- parent.getCurrentRecord.update(fieldIndex, value.getBytes)
+// Optimized for non-nested rows
+private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
+ override def write(record: Row): Unit = {
+ if (attributes.size > record.size) {
+ throw new IndexOutOfBoundsException(
+ s"Trying to write more fields than contained in row (${attributes.size}>${record.size})")
+ }
- override def addBoolean(value: Boolean): Unit =
- parent.getCurrentRecord.setBoolean(fieldIndex, value)
+ var index = 0
+ writer.startMessage()
+ while(index < attributes.size) {
+ // null values indicate optional fields but we do not check currently
+ if (record(index) != null && record(index) != Nil) {
+ writer.startField(attributes(index).name, index)
+ consumeType(attributes(index).dataType, record, index)
+ writer.endField(attributes(index).name, index)
+ }
+ index = index + 1
+ }
+ writer.endMessage()
+ }
- override def addDouble(value: Double): Unit =
- parent.getCurrentRecord.setDouble(fieldIndex, value)
+ private def consumeType(
+ ctype: DataType,
+ record: Row,
+ index: Int): Unit = {
+ ctype match {
+ case StringType => writer.addBinary(
+ Binary.fromByteArray(
+ record(index).asInstanceOf[String].getBytes("utf-8")
+ )
+ )
+ case IntegerType => writer.addInteger(record.getInt(index))
+ case ShortType => writer.addInteger(record.getShort(index))
+ case LongType => writer.addLong(record.getLong(index))
+ case ByteType => writer.addInteger(record.getByte(index))
+ case DoubleType => writer.addDouble(record.getDouble(index))
+ case FloatType => writer.addFloat(record.getFloat(index))
+ case BooleanType => writer.addBoolean(record.getBoolean(index))
+ case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
+ }
+ }
+}
- override def addFloat(value: Float): Unit =
- parent.getCurrentRecord.setFloat(fieldIndex, value)
+private[parquet] object RowWriteSupport {
+ val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
- override def addInt(value: Int): Unit =
- parent.getCurrentRecord.setInt(fieldIndex, value)
+ def getSchema(configuration: Configuration): Seq[Attribute] = {
+ val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
+ if (schemaString == null) {
+ throw new RuntimeException("Missing schema!")
+ }
+ ParquetTypesConverter.convertFromString(schemaString)
+ }
- override def addLong(value: Long): Unit =
- parent.getCurrentRecord.setLong(fieldIndex, value)
+ def setSchema(schema: Seq[Attribute], configuration: Configuration) {
+ val encoded = ParquetTypesConverter.convertToString(schema)
+ configuration.set(SPARK_ROW_SCHEMA, encoded)
+ configuration.set(
+ ParquetOutputFormat.WRITER_VERSION,
+ ParquetProperties.WriterVersion.PARQUET_1_0.toString)
+ }
}
-/**
- * A `parquet.io.api.PrimitiveConverter` that converts Parquet strings (fixed-length byte arrays)
- * into Catalyst Strings.
- *
- * @param parent The parent group converter.
- * @param fieldIndex The index inside the record.
- */
-private[parquet] class CatalystPrimitiveStringConverter(
- parent: CatalystGroupConverter,
- fieldIndex: Int) extends CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addBinary(value: Binary): Unit =
- parent.getCurrentRecord.setString(fieldIndex, value.toStringUsingUTF8)
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index 46c7172985..1dc58633a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -17,14 +17,19 @@
package org.apache.spark.sql.parquet
+import java.io.File
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
import parquet.example.data.{GroupWriter, Group}
import parquet.example.data.simple.SimpleGroup
-import parquet.hadoop.ParquetWriter
+import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
+import parquet.hadoop.example.GroupReadSupport
+import parquet.hadoop.util.ContextUtil
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
@@ -51,13 +56,13 @@ private[sql] object ParquetTestData {
val testSchema =
"""message myrecord {
- |optional boolean myboolean;
- |optional int32 myint;
- |optional binary mystring;
- |optional int64 mylong;
- |optional float myfloat;
- |optional double mydouble;
- |}""".stripMargin
+ optional boolean myboolean;
+ optional int32 myint;
+ optional binary mystring;
+ optional int64 mylong;
+ optional float myfloat;
+ optional double mydouble;
+ }"""
// field names for test assertion error messages
val testSchemaFieldNames = Seq(
@@ -71,23 +76,23 @@ private[sql] object ParquetTestData {
val subTestSchema =
"""
- |message myrecord {
- |optional boolean myboolean;
- |optional int64 mylong;
- |}
- """.stripMargin
+ message myrecord {
+ optional boolean myboolean;
+ optional int64 mylong;
+ }
+ """
val testFilterSchema =
"""
- |message myrecord {
- |required boolean myboolean;
- |required int32 myint;
- |required binary mystring;
- |required int64 mylong;
- |required float myfloat;
- |required double mydouble;
- |}
- """.stripMargin
+ message myrecord {
+ required boolean myboolean;
+ required int32 myint;
+ required binary mystring;
+ required int64 mylong;
+ required float myfloat;
+ required double mydouble;
+ }
+ """
// field names for test assertion error messages
val subTestSchemaFieldNames = Seq(
@@ -100,9 +105,110 @@ private[sql] object ParquetTestData {
lazy val testData = new ParquetRelation(testDir.toURI.toString)
+ val testNestedSchema1 =
+ // based on blogpost example, source:
+ // https://blog.twitter.com/2013/dremel-made-simple-with-parquet
+ // note: instead of string we have to use binary (?) otherwise
+ // Parquet gives us:
+ // IllegalArgumentException: expected one of [INT64, INT32, BOOLEAN,
+ // BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY]
+ // Also repeated primitives seem tricky to convert (AvroParquet
+ // only uses them in arrays?) so only use at most one in each group
+ // and nothing else in that group (-> is mapped to array)!
+ // The "values" inside ownerPhoneNumbers is a keyword currently
+ // so that array types can be translated correctly.
+ """
+ message AddressBook {
+ required binary owner;
+ optional group ownerPhoneNumbers {
+ repeated binary array;
+ }
+ optional group contacts {
+ repeated group array {
+ required binary name;
+ optional binary phoneNumber;
+ }
+ }
+ }
+ """
+
+
+ val testNestedSchema2 =
+ """
+ message TestNested2 {
+ required int32 firstInt;
+ optional int32 secondInt;
+ optional group longs {
+ repeated int64 array;
+ }
+ required group entries {
+ repeated group array {
+ required double value;
+ optional boolean truth;
+ }
+ }
+ optional group outerouter {
+ repeated group array {
+ repeated group array {
+ repeated int32 array;
+ }
+ }
+ }
+ }
+ """
+
+ val testNestedSchema3 =
+ """
+ message TestNested3 {
+ required int32 x;
+ optional group booleanNumberPairs {
+ repeated group array {
+ required int32 key;
+ optional group value {
+ repeated group array {
+ required double nestedValue;
+ optional boolean truth;
+ }
+ }
+ }
+ }
+ }
+ """
+
+ val testNestedSchema4 =
+ """
+ message TestNested4 {
+ required int32 x;
+ optional group data1 {
+ repeated group map {
+ required binary key;
+ required int32 value;
+ }
+ }
+ required group data2 {
+ repeated group map {
+ required binary key;
+ required group value {
+ required int64 payload1;
+ optional binary payload2;
+ }
+ }
+ }
+ }
+ """
+
+ val testNestedDir1 = Utils.createTempDir()
+ val testNestedDir2 = Utils.createTempDir()
+ val testNestedDir3 = Utils.createTempDir()
+ val testNestedDir4 = Utils.createTempDir()
+
+ lazy val testNestedData1 = new ParquetRelation(testNestedDir1.toURI.toString)
+ lazy val testNestedData2 = new ParquetRelation(testNestedDir2.toURI.toString)
+
def writeFile() = {
- testDir.delete
+ testDir.delete()
val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet"))
+ val job = new Job()
val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
val writeSupport = new TestGroupWriteSupport(schema)
val writer = new ParquetWriter[Group](path, writeSupport)
@@ -150,5 +256,149 @@ private[sql] object ParquetTestData {
}
writer.close()
}
+
+ def writeNestedFile1() {
+ // example data from https://blog.twitter.com/2013/dremel-made-simple-with-parquet
+ testNestedDir1.delete()
+ val path: Path = new Path(new Path(testNestedDir1.toURI), new Path("part-r-0.parquet"))
+ val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema1)
+
+ val r1 = new SimpleGroup(schema)
+ r1.add(0, "Julien Le Dem")
+ r1.addGroup(1)
+ .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 123 4567")
+ .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 666 1337")
+ .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "XXX XXX XXXX")
+ val contacts = r1.addGroup(2)
+ contacts.addGroup(0)
+ .append("name", "Dmitriy Ryaboy")
+ .append("phoneNumber", "555 987 6543")
+ contacts.addGroup(0)
+ .append("name", "Chris Aniszczyk")
+
+ val r2 = new SimpleGroup(schema)
+ r2.add(0, "A. Nonymous")
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+ writer.write(r1)
+ writer.write(r2)
+ writer.close()
+ }
+
+ def writeNestedFile2() {
+ testNestedDir2.delete()
+ val path: Path = new Path(new Path(testNestedDir2.toURI), new Path("part-r-0.parquet"))
+ val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema2)
+
+ val r1 = new SimpleGroup(schema)
+ r1.add(0, 1)
+ r1.add(1, 7)
+ val longs = r1.addGroup(2)
+ longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME , 1.toLong << 32)
+ longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 33)
+ longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 34)
+ val booleanNumberPair = r1.addGroup(3).addGroup(0)
+ booleanNumberPair.add("value", 2.5)
+ booleanNumberPair.add("truth", false)
+ val top_level = r1.addGroup(4)
+ val second_level_a = top_level.addGroup(0)
+ val second_level_b = top_level.addGroup(0)
+ val third_level_aa = second_level_a.addGroup(0)
+ val third_level_ab = second_level_a.addGroup(0)
+ val third_level_c = second_level_b.addGroup(0)
+ third_level_aa.add(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ 7)
+ third_level_ab.add(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ 8)
+ third_level_c.add(
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ 9)
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+ writer.write(r1)
+ writer.close()
+ }
+
+ def writeNestedFile3() {
+ testNestedDir3.delete()
+ val path: Path = new Path(new Path(testNestedDir3.toURI), new Path("part-r-0.parquet"))
+ val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema3)
+
+ val r1 = new SimpleGroup(schema)
+ r1.add(0, 1)
+ val booleanNumberPairs = r1.addGroup(1)
+ val g1 = booleanNumberPairs.addGroup(0)
+ g1.add(0, 1)
+ val nested1 = g1.addGroup(1)
+ val ng1 = nested1.addGroup(0)
+ ng1.add(0, 1.5)
+ ng1.add(1, false)
+ val ng2 = nested1.addGroup(0)
+ ng2.add(0, 2.5)
+ ng2.add(1, true)
+ val g2 = booleanNumberPairs.addGroup(0)
+ g2.add(0, 2)
+ val ng3 = g2.addGroup(1)
+ .addGroup(0)
+ ng3.add(0, 3.5)
+ ng3.add(1, false)
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+ writer.write(r1)
+ writer.close()
+ }
+
+ def writeNestedFile4() {
+ testNestedDir4.delete()
+ val path: Path = new Path(new Path(testNestedDir4.toURI), new Path("part-r-0.parquet"))
+ val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema4)
+
+ val r1 = new SimpleGroup(schema)
+ r1.add(0, 7)
+ val map1 = r1.addGroup(1)
+ val keyValue1 = map1.addGroup(0)
+ keyValue1.add(0, "key1")
+ keyValue1.add(1, 1)
+ val keyValue2 = map1.addGroup(0)
+ keyValue2.add(0, "key2")
+ keyValue2.add(1, 2)
+ val map2 = r1.addGroup(2)
+ val keyValue3 = map2.addGroup(0)
+ // TODO: currently only string key type supported
+ keyValue3.add(0, "seven")
+ val valueGroup1 = keyValue3.addGroup(1)
+ valueGroup1.add(0, 42.toLong)
+ valueGroup1.add(1, "the answer")
+ val keyValue4 = map2.addGroup(0)
+ // TODO: currently only string key type supported
+ keyValue4.add(0, "eight")
+ val valueGroup2 = keyValue4.addGroup(1)
+ valueGroup2.add(0, 49.toLong)
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+ writer.write(r1)
+ writer.close()
+ }
+
+ // TODO: this is not actually used anywhere but useful for debugging
+ /* def readNestedFile(file: File, schemaString: String): Unit = {
+ val configuration = new Configuration()
+ val path = new Path(new Path(file.toURI), new Path("part-r-0.parquet"))
+ val fs: FileSystem = path.getFileSystem(configuration)
+ val schema: MessageType = MessageTypeParser.parseMessageType(schemaString)
+ assert(schema != null)
+ val outputStatus: FileStatus = fs.getFileStatus(new Path(path.toString))
+ val footers = ParquetFileReader.readFooter(configuration, outputStatus)
+ assert(footers != null)
+ val reader = new ParquetReader(new Path(path.toString), new GroupReadSupport())
+ val first = reader.read()
+ assert(first != null)
+ } */
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
new file mode 100644
index 0000000000..f9046368e7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.IOException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
+
+import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
+import parquet.hadoop.util.ContextUtil
+import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType}
+import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns}
+import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
+import parquet.schema.Type.Repetition
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.types._
+
+// Implicits
+import scala.collection.JavaConversions._
+
+private[parquet] object ParquetTypesConverter extends Logging {
+ def isPrimitiveType(ctype: DataType): Boolean =
+ classOf[PrimitiveType] isAssignableFrom ctype.getClass
+
+ def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
+ case ParquetPrimitiveTypeName.BINARY => StringType
+ case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
+ case ParquetPrimitiveTypeName.DOUBLE => DoubleType
+ case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType)
+ case ParquetPrimitiveTypeName.FLOAT => FloatType
+ case ParquetPrimitiveTypeName.INT32 => IntegerType
+ case ParquetPrimitiveTypeName.INT64 => LongType
+ case ParquetPrimitiveTypeName.INT96 =>
+ // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
+ sys.error("Potential loss of precision: cannot convert INT96")
+ case _ => sys.error(
+ s"Unsupported parquet datatype $parquetType")
+ }
+
+ /**
+ * Converts a given Parquet `Type` into the corresponding
+ * [[org.apache.spark.sql.catalyst.types.DataType]].
+ *
+ * We apply the following conversion rules:
+ * <ul>
+ * <li> Primitive types are converter to the corresponding primitive type.</li>
+ * <li> Group types that have a single field that is itself a group, which has repetition
+ * level `REPEATED`, are treated as follows:<ul>
+ * <li> If the nested group has name `values`, the surrounding group is converted
+ * into an [[ArrayType]] with the corresponding field type (primitive or
+ * complex) as element type.</li>
+ * <li> If the nested group has name `map` and two fields (named `key` and `value`),
+ * the surrounding group is converted into a [[MapType]]
+ * with the corresponding key and value (value possibly complex) types.
+ * Note that we currently assume map values are not nullable.</li>
+ * <li> Other group types are converted into a [[StructType]] with the corresponding
+ * field types.</li></ul></li>
+ * </ul>
+ * Note that fields are determined to be `nullable` if and only if their Parquet repetition
+ * level is not `REQUIRED`.
+ *
+ * @param parquetType The type to convert.
+ * @return The corresponding Catalyst type.
+ */
+ def toDataType(parquetType: ParquetType): DataType = {
+ def correspondsToMap(groupType: ParquetGroupType): Boolean = {
+ if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
+ false
+ } else {
+ // This mostly follows the convention in ``parquet.schema.ConversionPatterns``
+ val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+ keyValueGroup.getRepetition == Repetition.REPEATED &&
+ keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
+ keyValueGroup.getFieldCount == 2 &&
+ keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME &&
+ keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME
+ }
+ }
+
+ def correspondsToArray(groupType: ParquetGroupType): Boolean = {
+ groupType.getFieldCount == 1 &&
+ groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
+ groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
+ }
+
+ if (parquetType.isPrimitive) {
+ toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
+ } else {
+ val groupType = parquetType.asGroupType()
+ parquetType.getOriginalType match {
+ // if the schema was constructed programmatically there may be hints how to convert
+ // it inside the metadata via the OriginalType field
+ case ParquetOriginalType.LIST => { // TODO: check enums!
+ assert(groupType.getFieldCount == 1)
+ val field = groupType.getFields.apply(0)
+ new ArrayType(toDataType(field))
+ }
+ case ParquetOriginalType.MAP => {
+ assert(
+ !groupType.getFields.apply(0).isPrimitive,
+ "Parquet Map type malformatted: expected nested group for map!")
+ val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+ assert(
+ keyValueGroup.getFieldCount == 2,
+ "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
+ val keyType = toDataType(keyValueGroup.getFields.apply(0))
+ assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
+ val valueType = toDataType(keyValueGroup.getFields.apply(1))
+ assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
+ new MapType(keyType, valueType)
+ }
+ case _ => {
+ // Note: the order of these checks is important!
+ if (correspondsToMap(groupType)) { // MapType
+ val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+ val keyType = toDataType(keyValueGroup.getFields.apply(0))
+ assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
+ val valueType = toDataType(keyValueGroup.getFields.apply(1))
+ assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
+ new MapType(keyType, valueType)
+ } else if (correspondsToArray(groupType)) { // ArrayType
+ val elementType = toDataType(groupType.getFields.apply(0))
+ new ArrayType(elementType)
+ } else { // everything else: StructType
+ val fields = groupType
+ .getFields
+ .map(ptype => new StructField(
+ ptype.getName,
+ toDataType(ptype),
+ ptype.getRepetition != Repetition.REQUIRED))
+ new StructType(fields)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * For a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] return
+ * the name of the corresponding Parquet primitive type or None if the given type
+ * is not primitive.
+ *
+ * @param ctype The type to convert
+ * @return The name of the corresponding Parquet primitive type
+ */
+ def fromPrimitiveDataType(ctype: DataType):
+ Option[ParquetPrimitiveTypeName] = ctype match {
+ case StringType => Some(ParquetPrimitiveTypeName.BINARY)
+ case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
+ case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE)
+ case ArrayType(ByteType) =>
+ Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ case FloatType => Some(ParquetPrimitiveTypeName.FLOAT)
+ case IntegerType => Some(ParquetPrimitiveTypeName.INT32)
+ // There is no type for Byte or Short so we promote them to INT32.
+ case ShortType => Some(ParquetPrimitiveTypeName.INT32)
+ case ByteType => Some(ParquetPrimitiveTypeName.INT32)
+ case LongType => Some(ParquetPrimitiveTypeName.INT64)
+ case _ => None
+ }
+
+ /**
+ * Converts a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] into
+ * the corresponding Parquet `Type`.
+ *
+ * The conversion follows the rules below:
+ * <ul>
+ * <li> Primitive types are converted into Parquet's primitive types.</li>
+ * <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted
+ * into Parquet's `GroupType` with the corresponding field types.</li>
+ * <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are converted
+ * into a 2-level nested group, where the outer group has the inner
+ * group as sole field. The inner group has name `values` and
+ * repetition level `REPEATED` and has the element type of
+ * the array as schema. We use Parquet's `ConversionPatterns` for this
+ * purpose.</li>
+ * <li> [[org.apache.spark.sql.catalyst.types.MapType]]s are converted
+ * into a nested (2-level) Parquet `GroupType` with two fields: a key
+ * type and a value type. The nested group has repetition level
+ * `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
+ * for this purpose</li>
+ * </ul>
+ * Parquet's repetition level is generally set according to the following rule:
+ * <ul>
+ * <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType` or
+ * `MapType`, then the repetition level is set to `REPEATED`.</li>
+ * <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
+ * type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
+ * </ul>
+ *
+ *@param ctype The type to convert
+ * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]]
+ * whose type is converted
+ * @param nullable When true indicates that the attribute is nullable
+ * @param inArray When true indicates that this is a nested attribute inside an array.
+ * @return The corresponding Parquet type.
+ */
+ def fromDataType(
+ ctype: DataType,
+ name: String,
+ nullable: Boolean = true,
+ inArray: Boolean = false): ParquetType = {
+ val repetition =
+ if (inArray) {
+ Repetition.REPEATED
+ } else {
+ if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
+ }
+ val primitiveType = fromPrimitiveDataType(ctype)
+ if (primitiveType.isDefined) {
+ new ParquetPrimitiveType(repetition, primitiveType.get, name)
+ } else {
+ ctype match {
+ case ArrayType(elementType) => {
+ val parquetElementType = fromDataType(
+ elementType,
+ CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+ nullable = false,
+ inArray = true)
+ ConversionPatterns.listType(repetition, name, parquetElementType)
+ }
+ case StructType(structFields) => {
+ val fields = structFields.map {
+ field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
+ }
+ new ParquetGroupType(repetition, name, fields)
+ }
+ case MapType(keyType, valueType) => {
+ val parquetKeyType =
+ fromDataType(
+ keyType,
+ CatalystConverter.MAP_KEY_SCHEMA_NAME,
+ nullable = false,
+ inArray = false)
+ val parquetValueType =
+ fromDataType(
+ valueType,
+ CatalystConverter.MAP_VALUE_SCHEMA_NAME,
+ nullable = false,
+ inArray = false)
+ ConversionPatterns.mapType(
+ repetition,
+ name,
+ parquetKeyType,
+ parquetValueType)
+ }
+ case _ => sys.error(s"Unsupported datatype $ctype")
+ }
+ }
+ }
+
+ def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
+ parquetSchema
+ .asGroupType()
+ .getFields
+ .map(
+ field =>
+ new AttributeReference(
+ field.getName,
+ toDataType(field),
+ field.getRepetition != Repetition.REQUIRED)())
+ }
+
+ def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+ val fields = attributes.map(
+ attribute =>
+ fromDataType(attribute.dataType, attribute.name, attribute.nullable))
+ new MessageType("root", fields)
+ }
+
+ def convertFromString(string: String): Seq[Attribute] = {
+ DataType(string) match {
+ case s: StructType => s.toAttributes
+ case other => sys.error(s"Can convert $string to row")
+ }
+ }
+
+ def convertToString(schema: Seq[Attribute]): String = {
+ StructType.fromAttributes(schema).toString
+ }
+
+ def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = {
+ if (origPath == null) {
+ throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
+ }
+ val fs = origPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException(
+ s"Unable to write Parquet metadata: path $origPath is incorrectly formatted")
+ }
+ val path = origPath.makeQualified(fs)
+ if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(s"Expected to write to directory $path but found file")
+ }
+ val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+ if (fs.exists(metadataPath)) {
+ try {
+ fs.delete(metadataPath, true)
+ } catch {
+ case e: IOException =>
+ throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath")
+ }
+ }
+ val extraMetadata = new java.util.HashMap[String, String]()
+ extraMetadata.put(
+ RowReadSupport.SPARK_METADATA_KEY,
+ ParquetTypesConverter.convertToString(attributes))
+ // TODO: add extra data, e.g., table name, date, etc.?
+
+ val parquetSchema: MessageType =
+ ParquetTypesConverter.convertFromAttributes(attributes)
+ val metaData: FileMetaData = new FileMetaData(
+ parquetSchema,
+ extraMetadata,
+ "Spark")
+
+ ParquetRelation.enableLogForwarding()
+ ParquetFileWriter.writeMetadataFile(
+ conf,
+ path,
+ new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
+ }
+
+ /**
+ * Try to read Parquet metadata at the given Path. We first see if there is a summary file
+ * in the parent directory. If so, this is used. Else we read the actual footer at the given
+ * location.
+ * @param origPath The path at which we expect one (or more) Parquet files.
+ * @param configuration The Hadoop configuration to use.
+ * @return The `ParquetMetadata` containing among other things the schema.
+ */
+ def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
+ if (origPath == null) {
+ throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
+ }
+ val job = new Job()
+ val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
+ val fs: FileSystem = origPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
+ }
+ val path = origPath.makeQualified(fs)
+ if (!fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ s"Expected $path for be a directory with Parquet files/metadata")
+ }
+ ParquetRelation.enableLogForwarding()
+ val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+ // if this is a new table that was just created we will find only the metadata file
+ if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
+ ParquetFileReader.readFooter(conf, metadataPath)
+ } else {
+ // there may be one or more Parquet files in the given directory
+ val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
+ // TODO: for now we assume that all footers (if there is more than one) have identical
+ // metadata; we may want to add a check here at some point
+ if (footers.size() == 0) {
+ throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
+ }
+ footers(0).getParquetMetadata
+ }
+ }
+
+ /**
+ * Reads in Parquet Metadata from the given path and tries to extract the schema
+ * (Catalyst attributes) from the application-specific key-value map. If this
+ * is empty it falls back to converting from the Parquet file schema which
+ * may lead to an upcast of types (e.g., {byte, short} to int).
+ *
+ * @param origPath The path at which we expect one (or more) Parquet files.
+ * @param conf The Hadoop configuration to use.
+ * @return A list of attributes that make up the schema.
+ */
+ def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
+ val keyValueMetadata: java.util.Map[String, String] =
+ readMetaData(origPath, conf)
+ .getFileMetaData
+ .getKeyValueMetaData
+ if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
+ convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
+ } else {
+ val attributes = convertToAttributes(
+ readMetaData(origPath, conf).getFileMetaData.getSchema)
+ log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
+ attributes
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 9810520bb9..0c239d00b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -26,15 +26,16 @@ import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
import parquet.schema.MessageTypeParser
+import org.apache.spark.SparkContext
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.TestData
import org.apache.spark.sql.SchemaRDD
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.expressions.Equals
-import org.apache.spark.sql.catalyst.types.IntegerType
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star}
import org.apache.spark.util.Utils
// Implicits
@@ -56,15 +57,37 @@ case class OptionalReflectData(
doubleField: Option[Double],
booleanField: Option[Boolean])
+case class Nested(i: Int, s: String)
+
+case class Data(array: Seq[Int], nested: Nested)
+
+case class AllDataTypes(
+ stringField: String,
+ intField: Int,
+ longField: Long,
+ floatField: Float,
+ doubleField: Double,
+ shortField: Short,
+ byteField: Byte,
+ booleanField: Boolean)
+
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
import TestData._
TestData // Load test data tables.
var testRDD: SchemaRDD = null
+ // TODO: remove this once SqlParser can parse nested select statements
+ var nestedParserSqlContext: NestedParserSQLContext = null
+
override def beforeAll() {
+ nestedParserSqlContext = new NestedParserSQLContext(TestSQLContext.sparkContext)
ParquetTestData.writeFile()
ParquetTestData.writeFilterFile()
+ ParquetTestData.writeNestedFile1()
+ ParquetTestData.writeNestedFile2()
+ ParquetTestData.writeNestedFile3()
+ ParquetTestData.writeNestedFile4()
testRDD = parquetFile(ParquetTestData.testDir.toString)
testRDD.registerAsTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
@@ -74,9 +97,33 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
override def afterAll() {
Utils.deleteRecursively(ParquetTestData.testDir)
Utils.deleteRecursively(ParquetTestData.testFilterDir)
+ Utils.deleteRecursively(ParquetTestData.testNestedDir1)
+ Utils.deleteRecursively(ParquetTestData.testNestedDir2)
+ Utils.deleteRecursively(ParquetTestData.testNestedDir3)
+ Utils.deleteRecursively(ParquetTestData.testNestedDir4)
// here we should also unregister the table??
}
+ test("Read/Write All Types") {
+ val tempDir = getTempFilePath("parquetTest").getCanonicalPath
+ val range = (0 to 255)
+ TestSQLContext.sparkContext.parallelize(range)
+ .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))
+ .saveAsParquetFile(tempDir)
+ val result = parquetFile(tempDir).collect()
+ range.foreach {
+ i =>
+ assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
+ assert(result(i).getInt(1) === i)
+ assert(result(i).getLong(2) === i.toLong)
+ assert(result(i).getFloat(3) === i.toFloat)
+ assert(result(i).getDouble(4) === i.toDouble)
+ assert(result(i).getShort(5) === i.toShort)
+ assert(result(i).getByte(6) === i.toByte)
+ assert(result(i).getBoolean(7) === (i % 2 == 0))
+ }
+ }
+
test("self-join parquet files") {
val x = ParquetTestData.testData.as('x)
val y = ParquetTestData.testData.as('y)
@@ -154,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
path,
TestSQLContext.sparkContext.hadoopConfiguration)
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- val metaData = ParquetTypesConverter.readMetaData(path)
+ val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
assert(metaData != null)
ParquetTestData
.testData
@@ -197,10 +244,37 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
}
Utils.deleteRecursively(file)
- assert(true)
}
- test("insert (appending) to same table via Scala API") {
+ test("Insert (overwrite) via Scala API") {
+ val dirname = Utils.createTempDir()
+ val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ source_rdd.registerAsTable("source")
+ val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString)
+ dest_rdd.registerAsTable("dest")
+ sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
+ val rdd_copy1 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy1.size === 100)
+ assert(rdd_copy1(0).apply(0) === 1)
+ assert(rdd_copy1(0).apply(1) === "val_1")
+ // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
+ // executed twice otherwise?!
+ sql("INSERT INTO dest SELECT * FROM source")
+ val rdd_copy2 = sql("SELECT * FROM dest").collect()
+ assert(rdd_copy2.size === 200)
+ assert(rdd_copy2(0).apply(0) === 1)
+ assert(rdd_copy2(0).apply(1) === "val_1")
+ assert(rdd_copy2(99).apply(0) === 100)
+ assert(rdd_copy2(99).apply(1) === "val_100")
+ assert(rdd_copy2(100).apply(0) === 1)
+ assert(rdd_copy2(100).apply(1) === "val_1")
+ Utils.deleteRecursively(dirname)
+ }
+
+ test("Insert (appending) to same table via Scala API") {
+ // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
+ // executed twice otherwise?!
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null)
@@ -363,4 +437,272 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
assert(query.collect().size === 10)
}
+
+ test("Importing nested Parquet file (Addressbook)") {
+ val result = TestSQLContext
+ .parquetFile(ParquetTestData.testNestedDir1.toString)
+ .toSchemaRDD
+ .collect()
+ assert(result != null)
+ assert(result.size === 2)
+ val first_record = result(0)
+ val second_record = result(1)
+ assert(first_record != null)
+ assert(second_record != null)
+ assert(first_record.size === 3)
+ assert(second_record(1) === null)
+ assert(second_record(2) === null)
+ assert(second_record(0) === "A. Nonymous")
+ assert(first_record(0) === "Julien Le Dem")
+ val first_owner_numbers = first_record(1)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ val first_contacts = first_record(2)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ assert(first_owner_numbers != null)
+ assert(first_owner_numbers(0) === "555 123 4567")
+ assert(first_owner_numbers(2) === "XXX XXX XXXX")
+ assert(first_contacts(0)
+ .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2)
+ val first_contacts_entry_one = first_contacts(0)
+ .asInstanceOf[CatalystConverter.StructScalaType[_]]
+ assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy")
+ assert(first_contacts_entry_one(1) === "555 987 6543")
+ val first_contacts_entry_two = first_contacts(1)
+ .asInstanceOf[CatalystConverter.StructScalaType[_]]
+ assert(first_contacts_entry_two(0) === "Chris Aniszczyk")
+ }
+
+ test("Importing nested Parquet file (nested numbers)") {
+ val result = TestSQLContext
+ .parquetFile(ParquetTestData.testNestedDir2.toString)
+ .toSchemaRDD
+ .collect()
+ assert(result.size === 1, "number of top-level rows incorrect")
+ assert(result(0).size === 5, "number of fields in row incorrect")
+ assert(result(0)(0) === 1)
+ assert(result(0)(1) === 7)
+ val subresult1 = result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ assert(subresult1.size === 3)
+ assert(subresult1(0) === (1.toLong << 32))
+ assert(subresult1(1) === (1.toLong << 33))
+ assert(subresult1(2) === (1.toLong << 34))
+ val subresult2 = result(0)(3)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+ .asInstanceOf[CatalystConverter.StructScalaType[_]]
+ assert(subresult2.size === 2)
+ assert(subresult2(0) === 2.5)
+ assert(subresult2(1) === false)
+ val subresult3 = result(0)(4)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ assert(subresult3.size === 2)
+ assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 2)
+ val subresult4 = subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
+ assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
+ assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 1)
+ assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
+ }
+
+ test("Simple query on addressbook") {
+ val data = TestSQLContext
+ .parquetFile(ParquetTestData.testNestedDir1.toString)
+ .toSchemaRDD
+ val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect()
+ assert(tmp.size === 1)
+ assert(tmp(0)(0) === "Julien Le Dem")
+ }
+
+ test("Projection in addressbook") {
+ val data = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir1.toString)
+ .toSchemaRDD
+ data.registerAsTable("data")
+ val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM data")
+ val tmp = query.collect()
+ assert(tmp.size === 2)
+ assert(tmp(0).size === 2)
+ assert(tmp(0)(0) === "Julien Le Dem")
+ assert(tmp(0)(1) === "Chris Aniszczyk")
+ assert(tmp(1)(0) === "A. Nonymous")
+ assert(tmp(1)(1) === null)
+ }
+
+ test("Simple query on nested int data") {
+ val data = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir2.toString)
+ .toSchemaRDD
+ data.registerAsTable("data")
+ val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM data").collect()
+ assert(result1.size === 1)
+ assert(result1(0).size === 1)
+ assert(result1(0)(0) === 2.5)
+ val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM data").collect()
+ assert(result2.size === 1)
+ val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
+ assert(subresult1.size === 2)
+ assert(subresult1(0) === 2.5)
+ assert(subresult1(1) === false)
+ val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM data").collect()
+ val subresult2 = result3(0)(0)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+ assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
+ assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
+ assert(result3(0)(0)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+ .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
+ }
+
+ test("nested structs") {
+ val data = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir3.toString)
+ .toSchemaRDD
+ data.registerAsTable("data")
+ val result1 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
+ assert(result1.size === 1)
+ assert(result1(0).size === 1)
+ assert(result1(0)(0) === false)
+ val result2 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
+ assert(result2.size === 1)
+ assert(result2(0).size === 1)
+ assert(result2(0)(0) === true)
+ val result3 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
+ assert(result3.size === 1)
+ assert(result3(0).size === 1)
+ assert(result3(0)(0) === false)
+ }
+
+ test("simple map") {
+ val data = TestSQLContext
+ .parquetFile(ParquetTestData.testNestedDir4.toString)
+ .toSchemaRDD
+ data.registerAsTable("mapTable")
+ val result1 = sql("SELECT data1 FROM mapTable").collect()
+ assert(result1.size === 1)
+ assert(result1(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
+ .getOrElse("key1", 0) === 1)
+ assert(result1(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
+ .getOrElse("key2", 0) === 2)
+ val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect()
+ assert(result2(0)(0) === 1)
+ }
+
+ test("map with struct values") {
+ val data = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir4.toString)
+ .toSchemaRDD
+ data.registerAsTable("mapTable")
+ val result1 = nestedParserSqlContext.sql("SELECT data2 FROM mapTable").collect()
+ assert(result1.size === 1)
+ val entry1 = result1(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
+ .getOrElse("seven", null)
+ assert(entry1 != null)
+ assert(entry1(0) === 42)
+ assert(entry1(1) === "the answer")
+ val entry2 = result1(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
+ .getOrElse("eight", null)
+ assert(entry2 != null)
+ assert(entry2(0) === 49)
+ assert(entry2(1) === null)
+ val result2 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
+ assert(result2.size === 1)
+ assert(result2(0)(0) === 42.toLong)
+ assert(result2(0)(1) === "the answer")
+ }
+
+ test("Writing out Addressbook and reading it back in") {
+ // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
+ // has no effect in this test case
+ val tmpdir = Utils.createTempDir()
+ Utils.deleteRecursively(tmpdir)
+ val result = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir1.toString)
+ .toSchemaRDD
+ result.saveAsParquetFile(tmpdir.toString)
+ nestedParserSqlContext
+ .parquetFile(tmpdir.toString)
+ .toSchemaRDD
+ .registerAsTable("tmpcopy")
+ val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
+ assert(tmpdata.size === 2)
+ assert(tmpdata(0).size === 2)
+ assert(tmpdata(0)(0) === "Julien Le Dem")
+ assert(tmpdata(0)(1) === "Chris Aniszczyk")
+ assert(tmpdata(1)(0) === "A. Nonymous")
+ assert(tmpdata(1)(1) === null)
+ Utils.deleteRecursively(tmpdir)
+ }
+
+ test("Writing out Map and reading it back in") {
+ val data = nestedParserSqlContext
+ .parquetFile(ParquetTestData.testNestedDir4.toString)
+ .toSchemaRDD
+ val tmpdir = Utils.createTempDir()
+ Utils.deleteRecursively(tmpdir)
+ data.saveAsParquetFile(tmpdir.toString)
+ nestedParserSqlContext
+ .parquetFile(tmpdir.toString)
+ .toSchemaRDD
+ .registerAsTable("tmpmapcopy")
+ val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
+ assert(result1.size === 1)
+ assert(result1(0)(0) === 2)
+ val result2 = nestedParserSqlContext.sql("SELECT data2 FROM tmpmapcopy").collect()
+ assert(result2.size === 1)
+ val entry1 = result2(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
+ .getOrElse("seven", null)
+ assert(entry1 != null)
+ assert(entry1(0) === 42)
+ assert(entry1(1) === "the answer")
+ val entry2 = result2(0)(0)
+ .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
+ .getOrElse("eight", null)
+ assert(entry2 != null)
+ assert(entry2(0) === 49)
+ assert(entry2(1) === null)
+ val result3 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
+ assert(result3.size === 1)
+ assert(result3(0)(0) === 42.toLong)
+ assert(result3(0)(1) === "the answer")
+ Utils.deleteRecursively(tmpdir)
+ }
+}
+
+// TODO: the code below is needed temporarily until the standard parser is able to parse
+// nested field expressions correctly
+class NestedParserSQLContext(@transient override val sparkContext: SparkContext) extends SQLContext(sparkContext) {
+ override protected[sql] val parser = new NestedSqlParser()
+}
+
+class NestedSqlLexical(override val keywords: Seq[String]) extends SqlLexical(keywords) {
+ override def identChar = letter | elem('_')
+ delimiters += (".")
+}
+
+class NestedSqlParser extends SqlParser {
+ override val lexical = new NestedSqlLexical(reservedWords)
+
+ override protected lazy val baseExpression: PackratParser[Expression] =
+ expression ~ "[" ~ expression <~ "]" ^^ {
+ case base ~ _ ~ ordinal => GetItem(base, ordinal)
+ } |
+ expression ~ "." ~ ident ^^ {
+ case base ~ _ ~ fieldName => GetField(base, fieldName)
+ } |
+ TRUE ^^^ Literal(true, BooleanType) |
+ FALSE ^^^ Literal(false, BooleanType) |
+ cast |
+ "(" ~> expression <~ ")" |
+ function |
+ "-" ~> literal ^^ UnaryMinus |
+ ident ^^ UnresolvedAttribute |
+ "*" ^^^ Star(None) |
+ literal
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 68284344af..f923d68932 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -208,7 +208,9 @@ object HiveMetastoreTypes extends RegexParsers {
}
protected lazy val structType: Parser[DataType] =
- "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ StructType
+ "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ {
+ case fields => new StructType(fields)
+ }
protected lazy val dataType: Parser[DataType] =
arrayType |