aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAndre Schumacher <andre.schumacher@iki.fi>2014-06-19 23:47:45 -0700
committerReynold Xin <rxin@apache.org>2014-06-19 23:47:45 -0700
commitf479cf3743e416ee08e62806e1b34aff5998ac22 (patch)
tree2d3189d95f0bb552dda1a45f4a2bb51f8d5f1e3f /sql/catalyst
parentf397e92eb2986f4436fb9e66777fc652f91d8494 (diff)
downloadspark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.gz
spark-f479cf3743e416ee08e62806e1b34aff5998ac22.tar.bz2
spark-f479cf3743e416ee08e62806e1b34aff5998ac22.zip
SPARK-1293 [SQL] Parquet support for nested types
It should be possible to import and export data stored in Parquet's columnar format that contains nested types. For example: ```java message AddressBook { required binary owner; optional group ownerPhoneNumbers { repeated binary array; } optional group contacts { repeated group array { required binary name; optional binary phoneNumber; } } optional group nameToApartmentNumber { repeated group map { required binary key; required int32 value; } } } ``` The example could model a type (AddressBook) that contains records made of strings (owner), lists (ownerPhoneNumbers) and a table of contacts (e.g., a list of pairs or a map that can contain null values but keys must not be null). The list of tasks are as follows: <h6>Implement support for converting nested Parquet types to Spark/Catalyst types:</h6> - [x] Structs - [x] Lists - [x] Maps (note: currently keys need to be Strings) <h6>Implement import (via ``parquetFile``) of nested Parquet types (first version in this PR)</h6> - [x] Initial version <h6>Implement export (via ``saveAsParquetFile``)</h6> - [x] Initial version <h6>Test support for AvroParquet, etc.</h6> - [x] Initial testing of import of avro-generated Parquet data (simple + nested) Example: ```scala val data = TestSQLContext .parquetFile("input.dir") .toSchemaRDD data.registerAsTable("data") sql("SELECT owner, contacts[1].name, nameToApartmentNumber['John'] FROM data").collect() ``` Author: Andre Schumacher <andre.schumacher@iki.fi> Author: Michael Armbrust <michael@databricks.com> Closes #360 from AndreSchumacher/nested_parquet and squashes the following commits: 30708c8 [Andre Schumacher] Taking out AvroParquet test for now to remove Avro dependency 95c1367 [Andre Schumacher] Changes to ParquetRelation and its metadata 7eceb67 [Andre Schumacher] Review feedback 94eea3a [Andre Schumacher] Scalastyle 403061f [Andre Schumacher] Fixing some issues with tests and schema metadata b8a8b9a [Andre Schumacher] More fixes to short and byte conversion 63d1b57 [Andre Schumacher] Cleaning up and Scalastyle 88e6bdb [Andre Schumacher] Attempting to fix loss of schema 37e0a0a [Andre Schumacher] Cleaning up 14c3fd8 [Andre Schumacher] Attempting to fix Spark-Parquet schema conversion 3e1456c [Michael Armbrust] WIP: Directly serialize catalyst attributes. f7aeba3 [Michael Armbrust] [SPARK-1982] Support for ByteType and ShortType. 3104886 [Michael Armbrust] Nested Rows should be Rows, not Seqs. 3c6b25f [Andre Schumacher] Trying to reduce no-op changes wrt master 31465d6 [Andre Schumacher] Scalastyle: fixing commented out bottom de02538 [Andre Schumacher] Cleaning up ParquetTestData 2f5a805 [Andre Schumacher] Removing stripMargin from test schemas 191bc0d [Andre Schumacher] Changing to Seq for ArrayType, refactoring SQLParser for nested field extension cbb5793 [Andre Schumacher] Code review feedback 32229c7 [Andre Schumacher] Removing Row nested values and placing by generic types 0ae9376 [Andre Schumacher] Doc strings and simplifying ParquetConverter.scala a6b4f05 [Andre Schumacher] Cleaning up ArrayConverter, moving classTag to NativeType, adding NativeRow 431f00f [Andre Schumacher] Fixing problems introduced during rebase c52ff2c [Andre Schumacher] Adding native-array converter 619c397 [Andre Schumacher] Completing Map testcase 79d81d5 [Andre Schumacher] Replacing field names for array and map in WriteSupport f466ff0 [Andre Schumacher] Added ParquetAvro tests and revised Array conversion adc1258 [Andre Schumacher] Optimizing imports e99cc51 [Andre Schumacher] Fixing nested WriteSupport and adding tests 1dc5ac9 [Andre Schumacher] First version of WriteSupport for nested types d1911dc [Andre Schumacher] Simplifying ArrayType conversion f777b4b [Andre Schumacher] Scalastyle 824500c [Andre Schumacher] Adding attribute resolution for MapType b539fde [Andre Schumacher] First commit for MapType a594aed [Andre Schumacher] Scalastyle 4e25fcb [Andre Schumacher] Adding resolution of complex ArrayTypes f8f8911 [Andre Schumacher] For primitive rows fall back to more efficient converter, code reorg 6dbc9b7 [Andre Schumacher] Fixing some problems intruduced during rebase b7fcc35 [Andre Schumacher] Documenting conversions, bugfix, wrappers of Rows ee70125 [Andre Schumacher] fixing one problem with arrayconverter 98219cf [Andre Schumacher] added struct converter 5d80461 [Andre Schumacher] fixing one problem with nested structs and breaking up files 1b1b3d6 [Andre Schumacher] Fixing one problem with nested arrays ddb40d2 [Andre Schumacher] Extending tests for nested Parquet data 745a42b [Andre Schumacher] Completing testcase for nested data (Addressbook( 6125c75 [Andre Schumacher] First working nested Parquet record input 4d4892a [Andre Schumacher] First commit nested Parquet read converters aa688fe [Andre Schumacher] Adding conversion of nested Parquet schemas
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala111
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala98
3 files changed, 149 insertions, 62 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 46fcfbb9e2..2ad2d04af5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected case class Keyword(str: String)
protected implicit def asParser(k: Keyword): Parser[String] =
- allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
-
- protected class SqlLexical extends StdLexical {
- case class FloatLit(chars: String) extends Token {
- override def toString = chars
- }
- override lazy val token: Parser[Token] = (
- identChar ~ rep( identChar | digit ) ^^
- { case first ~ rest => processIdent(first :: rest mkString "") }
- | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
- case i ~ None => NumericLit(i mkString "")
- case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
- }
- | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
- { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
- | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
- { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
- | EofCh ^^^ EOF
- | '\'' ~> failure("unclosed string literal")
- | '\"' ~> failure("unclosed string literal")
- | delim
- | failure("illegal character")
- )
-
- override def identChar = letter | elem('.') | elem('_')
-
- override def whitespace: Parser[Any] = rep(
- whitespaceChar
- | '/' ~ '*' ~ comment
- | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
- | '#' ~ rep( chrExcept(EofCh, '\n') )
- | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
- | '/' ~ '*' ~ failure("unclosed comment")
- )
- }
-
- override val lexical = new SqlLexical
+ lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
@@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
this.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword])
-
- /** Generate all variations of upper and lower case of a given string */
- private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
- if (s == "") {
- Stream(prefix)
- } else {
- allCaseVersions(s.tail, prefix + s.head.toLower) ++
- allCaseVersions(s.tail, prefix + s.head.toUpper)
- }
- }
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
- lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))
-
- lexical.delimiters += (
- "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
- ",", ";", "%", "{", "}", ":", "[", "]"
- )
+ override val lexical = new SqlLexical(reservedWords)
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
@@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
protected lazy val baseExpression: PackratParser[Expression] =
- expression ~ "[" ~ expression <~ "]" ^^ {
+ expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
TRUE ^^^ Literal(true, BooleanType) |
@@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType
}
+
+class SqlLexical(val keywords: Seq[String]) extends StdLexical {
+ case class FloatLit(chars: String) extends Token {
+ override def toString = chars
+ }
+
+ reserved ++= keywords.flatMap(w => allCaseVersions(w))
+
+ delimiters += (
+ "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+ ",", ";", "%", "{", "}", ":", "[", "]"
+ )
+
+ override lazy val token: Parser[Token] = (
+ identChar ~ rep( identChar | digit ) ^^
+ { case first ~ rest => processIdent(first :: rest mkString "") }
+ | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
+ case i ~ None => NumericLit(i mkString "")
+ case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
+ }
+ | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
+ { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
+ | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
+ { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
+ | EofCh ^^^ EOF
+ | '\'' ~> failure("unclosed string literal")
+ | '\"' ~> failure("unclosed string literal")
+ | delim
+ | failure("illegal character")
+ )
+
+ override def identChar = letter | elem('_') | elem('.')
+
+ override def whitespace: Parser[Any] = rep(
+ whitespaceChar
+ | '/' ~ '*' ~ comment
+ | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
+ | '#' ~ rep( chrExcept(EofCh, '\n') )
+ | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
+ | '/' ~ '*' ~ failure("unclosed comment")
+ )
+
+ /** Generate all variations of upper and lower case of a given string */
+ def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
+ if (s == "") {
+ Stream(prefix)
+ } else {
+ allCaseVersions(s.tail, prefix + s.head.toLower) ++
+ allCaseVersions(s.tail, prefix + s.head.toUpper)
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index b6aeae92f8..5d3bb25ad5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
null
} else {
if (child.dataType.isInstanceOf[ArrayType]) {
+ // TODO: consider using Array[_] for ArrayType child to avoid
+ // boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val o = key.asInstanceOf[Int]
if (o >= baseValue.size || o < 0) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index da34bd3a21..bb77bccf86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,9 +19,71 @@ package org.apache.spark.sql.catalyst.types
import java.sql.Timestamp
-import scala.reflect.runtime.universe.{typeTag, TypeTag}
+import scala.util.parsing.combinator.RegexParsers
-import org.apache.spark.sql.catalyst.expressions.Expression
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.util.Utils
+
+/**
+ *
+ */
+object DataType extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ "StringType" ^^^ StringType |
+ "FloatType" ^^^ FloatType |
+ "IntegerType" ^^^ IntegerType |
+ "ByteType" ^^^ ByteType |
+ "ShortType" ^^^ ShortType |
+ "DoubleType" ^^^ DoubleType |
+ "LongType" ^^^ LongType |
+ "BinaryType" ^^^ BinaryType |
+ "BooleanType" ^^^ BooleanType |
+ "DecimalType" ^^^ DecimalType |
+ "TimestampType" ^^^ TimestampType
+
+ protected lazy val arrayType: Parser[DataType] =
+ "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType
+
+ protected lazy val mapType: Parser[DataType] =
+ "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
+ case t1 ~ _ ~ t2 => MapType(t1, t2)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+ case name ~ tpe ~ nullable =>
+ StructField(name, tpe, nullable = nullable)
+ }
+
+ protected lazy val boolVal: Parser[Boolean] =
+ "true" ^^^ true |
+ "false" ^^^ false
+
+
+ protected lazy val structType: Parser[DataType] =
+ "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+ case fields => new StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ arrayType |
+ mapType |
+ structType |
+ primitiveType
+
+ /**
+ * Parses a string representation of a DataType.
+ *
+ * TODO: Generate parser as pickler...
+ */
+ def apply(asString: String): DataType = parseAll(dataType, asString) match {
+ case Success(result, _) => result
+ case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
+ }
+}
abstract class DataType {
/** Matches any expression that evaluates to this DataType */
@@ -29,25 +91,36 @@ abstract class DataType {
case e: Expression if e.dataType == this => true
case _ => false
}
+
+ def isPrimitive: Boolean = false
}
case object NullType extends DataType
+trait PrimitiveType extends DataType {
+ override def isPrimitive = true
+}
+
abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
+
+ @transient val classTag = {
+ val mirror = runtimeMirror(Utils.getSparkClassLoader)
+ ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
+ }
}
-case object StringType extends NativeType {
+case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
-case object BinaryType extends DataType {
+case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
-case object BooleanType extends NativeType {
+case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
@@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
}
}
-abstract class NumericType extends NativeType {
+abstract class NumericType extends NativeType with PrimitiveType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
@@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
case class ArrayType(elementType: DataType) extends DataType
case class StructField(name: String, dataType: DataType, nullable: Boolean)
-case class StructType(fields: Seq[StructField]) extends DataType
+
+object StructType {
+ def fromAttributes(attributes: Seq[Attribute]): StructType = {
+ StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
+ }
+
+ // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
+}
+
+case class StructType(fields: Seq[StructField]) extends DataType {
+ def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+}
case class MapType(keyType: DataType, valueType: DataType) extends DataType