diff options
author | Andre Schumacher <andre.schumacher@iki.fi> | 2014-06-19 23:47:45 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-19 23:47:45 -0700 |
commit | f479cf3743e416ee08e62806e1b34aff5998ac22 (patch) | |
tree | 2d3189d95f0bb552dda1a45f4a2bb51f8d5f1e3f /sql/catalyst | |
parent | f397e92eb2986f4436fb9e66777fc652f91d8494 (diff) | |
download | spark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.gz spark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.bz2 spark-f479cf3743e416ee08e62806e1b34aff5998ac22.zip |
SPARK-1293 [SQL] Parquet support for nested types
It should be possible to import and export data stored in Parquet's columnar format that contains nested types. For example:
```java
message AddressBook {
required binary owner;
optional group ownerPhoneNumbers {
repeated binary array;
}
optional group contacts {
repeated group array {
required binary name;
optional binary phoneNumber;
}
}
optional group nameToApartmentNumber {
repeated group map {
required binary key;
required int32 value;
}
}
}
```
The example could model a type (AddressBook) that contains records made of strings (owner), lists (ownerPhoneNumbers) and a table of contacts (e.g., a list of pairs or a map that can contain null values but keys must not be null). The list of tasks are as follows:
<h6>Implement support for converting nested Parquet types to Spark/Catalyst types:</h6>
- [x] Structs
- [x] Lists
- [x] Maps (note: currently keys need to be Strings)
<h6>Implement import (via ``parquetFile``) of nested Parquet types (first version in this PR)</h6>
- [x] Initial version
<h6>Implement export (via ``saveAsParquetFile``)</h6>
- [x] Initial version
<h6>Test support for AvroParquet, etc.</h6>
- [x] Initial testing of import of avro-generated Parquet data (simple + nested)
Example:
```scala
val data = TestSQLContext
.parquetFile("input.dir")
.toSchemaRDD
data.registerAsTable("data")
sql("SELECT owner, contacts[1].name, nameToApartmentNumber['John'] FROM data").collect()
```
Author: Andre Schumacher <andre.schumacher@iki.fi>
Author: Michael Armbrust <michael@databricks.com>
Closes #360 from AndreSchumacher/nested_parquet and squashes the following commits:
30708c8 [Andre Schumacher] Taking out AvroParquet test for now to remove Avro dependency
95c1367 [Andre Schumacher] Changes to ParquetRelation and its metadata
7eceb67 [Andre Schumacher] Review feedback
94eea3a [Andre Schumacher] Scalastyle
403061f [Andre Schumacher] Fixing some issues with tests and schema metadata
b8a8b9a [Andre Schumacher] More fixes to short and byte conversion
63d1b57 [Andre Schumacher] Cleaning up and Scalastyle
88e6bdb [Andre Schumacher] Attempting to fix loss of schema
37e0a0a [Andre Schumacher] Cleaning up
14c3fd8 [Andre Schumacher] Attempting to fix Spark-Parquet schema conversion
3e1456c [Michael Armbrust] WIP: Directly serialize catalyst attributes.
f7aeba3 [Michael Armbrust] [SPARK-1982] Support for ByteType and ShortType.
3104886 [Michael Armbrust] Nested Rows should be Rows, not Seqs.
3c6b25f [Andre Schumacher] Trying to reduce no-op changes wrt master
31465d6 [Andre Schumacher] Scalastyle: fixing commented out bottom
de02538 [Andre Schumacher] Cleaning up ParquetTestData
2f5a805 [Andre Schumacher] Removing stripMargin from test schemas
191bc0d [Andre Schumacher] Changing to Seq for ArrayType, refactoring SQLParser for nested field extension
cbb5793 [Andre Schumacher] Code review feedback
32229c7 [Andre Schumacher] Removing Row nested values and placing by generic types
0ae9376 [Andre Schumacher] Doc strings and simplifying ParquetConverter.scala
a6b4f05 [Andre Schumacher] Cleaning up ArrayConverter, moving classTag to NativeType, adding NativeRow
431f00f [Andre Schumacher] Fixing problems introduced during rebase
c52ff2c [Andre Schumacher] Adding native-array converter
619c397 [Andre Schumacher] Completing Map testcase
79d81d5 [Andre Schumacher] Replacing field names for array and map in WriteSupport
f466ff0 [Andre Schumacher] Added ParquetAvro tests and revised Array conversion
adc1258 [Andre Schumacher] Optimizing imports
e99cc51 [Andre Schumacher] Fixing nested WriteSupport and adding tests
1dc5ac9 [Andre Schumacher] First version of WriteSupport for nested types
d1911dc [Andre Schumacher] Simplifying ArrayType conversion
f777b4b [Andre Schumacher] Scalastyle
824500c [Andre Schumacher] Adding attribute resolution for MapType
b539fde [Andre Schumacher] First commit for MapType
a594aed [Andre Schumacher] Scalastyle
4e25fcb [Andre Schumacher] Adding resolution of complex ArrayTypes
f8f8911 [Andre Schumacher] For primitive rows fall back to more efficient converter, code reorg
6dbc9b7 [Andre Schumacher] Fixing some problems intruduced during rebase
b7fcc35 [Andre Schumacher] Documenting conversions, bugfix, wrappers of Rows
ee70125 [Andre Schumacher] fixing one problem with arrayconverter
98219cf [Andre Schumacher] added struct converter
5d80461 [Andre Schumacher] fixing one problem with nested structs and breaking up files
1b1b3d6 [Andre Schumacher] Fixing one problem with nested arrays
ddb40d2 [Andre Schumacher] Extending tests for nested Parquet data
745a42b [Andre Schumacher] Completing testcase for nested data (Addressbook(
6125c75 [Andre Schumacher] First working nested Parquet record input
4d4892a [Andre Schumacher] First commit nested Parquet read converters
aa688fe [Andre Schumacher] Adding conversion of nested Parquet schemas
Diffstat (limited to 'sql/catalyst')
3 files changed, 149 insertions, 62 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 46fcfbb9e2..2ad2d04af5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected case class Keyword(str: String) protected implicit def asParser(k: Keyword): Parser[String] = - allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) - - protected class SqlLexical extends StdLexical { - case class FloatLit(chars: String) extends Token { - override def toString = chars - } - override lazy val token: Parser[Token] = ( - identChar ~ rep( identChar | digit ) ^^ - { case first ~ rest => processIdent(first :: rest mkString "") } - | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { - case i ~ None => NumericLit(i mkString "") - case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString("")) - } - | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^ - { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") } - | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^ - { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") } - | EofCh ^^^ EOF - | '\'' ~> failure("unclosed string literal") - | '\"' ~> failure("unclosed string literal") - | delim - | failure("illegal character") - ) - - override def identChar = letter | elem('.') | elem('_') - - override def whitespace: Parser[Any] = rep( - whitespaceChar - | '/' ~ '*' ~ comment - | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') ) - | '#' ~ rep( chrExcept(EofCh, '\n') ) - | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') ) - | '/' ~ '*' ~ failure("unclosed comment") - ) - } - - override val lexical = new SqlLexical + lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") @@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { this.getClass .getMethods .filter(_.getReturnType == classOf[Keyword]) - .map(_.invoke(this).asInstanceOf[Keyword]) - - /** Generate all variations of upper and lower case of a given string */ - private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { - if (s == "") { - Stream(prefix) - } else { - allCaseVersions(s.tail, prefix + s.head.toLower) ++ - allCaseVersions(s.tail, prefix + s.head.toUpper) - } - } + .map(_.invoke(this).asInstanceOf[Keyword].str) - lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str)) - - lexical.delimiters += ( - "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", - ",", ";", "%", "{", "}", ":", "[", "]" - ) + override val lexical = new SqlLexical(reservedWords) protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = { exprs.zipWithIndex.map { @@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) protected lazy val baseExpression: PackratParser[Expression] = - expression ~ "[" ~ expression <~ "]" ^^ { + expression ~ "[" ~ expression <~ "]" ^^ { case base ~ _ ~ ordinal => GetItem(base, ordinal) } | TRUE ^^^ Literal(true, BooleanType) | @@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val dataType: Parser[DataType] = STRING ^^^ StringType } + +class SqlLexical(val keywords: Seq[String]) extends StdLexical { + case class FloatLit(chars: String) extends Token { + override def toString = chars + } + + reserved ++= keywords.flatMap(w => allCaseVersions(w)) + + delimiters += ( + "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", + ",", ";", "%", "{", "}", ":", "[", "]" + ) + + override lazy val token: Parser[Token] = ( + identChar ~ rep( identChar | digit ) ^^ + { case first ~ rest => processIdent(first :: rest mkString "") } + | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { + case i ~ None => NumericLit(i mkString "") + case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString("")) + } + | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^ + { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") } + | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^ + { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") } + | EofCh ^^^ EOF + | '\'' ~> failure("unclosed string literal") + | '\"' ~> failure("unclosed string literal") + | delim + | failure("illegal character") + ) + + override def identChar = letter | elem('_') | elem('.') + + override def whitespace: Parser[Any] = rep( + whitespaceChar + | '/' ~ '*' ~ comment + | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') ) + | '#' ~ rep( chrExcept(EofCh, '\n') ) + | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') ) + | '/' ~ '*' ~ failure("unclosed comment") + ) + + /** Generate all variations of upper and lower case of a given string */ + def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { + if (s == "") { + Stream(prefix) + } else { + allCaseVersions(s.tail, prefix + s.head.toLower) ++ + allCaseVersions(s.tail, prefix + s.head.toUpper) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index b6aeae92f8..5d3bb25ad5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression { null } else { if (child.dataType.isInstanceOf[ArrayType]) { + // TODO: consider using Array[_] for ArrayType child to avoid + // boxing of primitives val baseValue = value.asInstanceOf[Seq[_]] val o = key.asInstanceOf[Int] if (o >= baseValue.size || o < 0) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index da34bd3a21..bb77bccf86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,9 +19,71 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp -import scala.reflect.runtime.universe.{typeTag, TypeTag} +import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.catalyst.expressions.Expression +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror} + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.util.Utils + +/** + * + */ +object DataType extends RegexParsers { + protected lazy val primitiveType: Parser[DataType] = + "StringType" ^^^ StringType | + "FloatType" ^^^ FloatType | + "IntegerType" ^^^ IntegerType | + "ByteType" ^^^ ByteType | + "ShortType" ^^^ ShortType | + "DoubleType" ^^^ DoubleType | + "LongType" ^^^ LongType | + "BinaryType" ^^^ BinaryType | + "BooleanType" ^^^ BooleanType | + "DecimalType" ^^^ DecimalType | + "TimestampType" ^^^ TimestampType + + protected lazy val arrayType: Parser[DataType] = + "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType + + protected lazy val mapType: Parser[DataType] = + "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { + case t1 ~ _ ~ t2 => MapType(t1, t2) + } + + protected lazy val structField: Parser[StructField] = + ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ { + case name ~ tpe ~ nullable => + StructField(name, tpe, nullable = nullable) + } + + protected lazy val boolVal: Parser[Boolean] = + "true" ^^^ true | + "false" ^^^ false + + + protected lazy val structType: Parser[DataType] = + "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ { + case fields => new StructType(fields) + } + + protected lazy val dataType: Parser[DataType] = + arrayType | + mapType | + structType | + primitiveType + + /** + * Parses a string representation of a DataType. + * + * TODO: Generate parser as pickler... + */ + def apply(asString: String): DataType = parseAll(dataType, asString) match { + case Success(result, _) => result + case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure") + } +} abstract class DataType { /** Matches any expression that evaluates to this DataType */ @@ -29,25 +91,36 @@ abstract class DataType { case e: Expression if e.dataType == this => true case _ => false } + + def isPrimitive: Boolean = false } case object NullType extends DataType +trait PrimitiveType extends DataType { + override def isPrimitive = true +} + abstract class NativeType extends DataType { type JvmType @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] + + @transient val classTag = { + val mirror = runtimeMirror(Utils.getSparkClassLoader) + ClassTag[JvmType](mirror.runtimeClass(tag.tpe)) + } } -case object StringType extends NativeType { +case object StringType extends NativeType with PrimitiveType { type JvmType = String @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } -case object BinaryType extends DataType { +case object BinaryType extends DataType with PrimitiveType { type JvmType = Array[Byte] } -case object BooleanType extends NativeType { +case object BooleanType extends NativeType with PrimitiveType { type JvmType = Boolean @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] @@ -63,7 +136,7 @@ case object TimestampType extends NativeType { } } -abstract class NumericType extends NativeType { +abstract class NumericType extends NativeType with PrimitiveType { // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets @@ -154,6 +227,17 @@ case object FloatType extends FractionalType { case class ArrayType(elementType: DataType) extends DataType case class StructField(name: String, dataType: DataType, nullable: Boolean) -case class StructType(fields: Seq[StructField]) extends DataType + +object StructType { + def fromAttributes(attributes: Seq[Attribute]): StructType = { + StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable))) + } + + // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq) +} + +case class StructType(fields: Seq[StructField]) extends DataType { + def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)()) +} case class MapType(keyType: DataType, valueType: DataType) extends DataType |