aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-07-30 00:15:31 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-30 00:15:31 -0700
commit7003c163dbb46bb7313aab130a33486a356435a8 (patch)
treed125fa76f05683c209bd7f60a64da3f73d3c82ca /sql/catalyst
parent4ce92ccaf761e48a10fc4fe4927dbfca858ca22b (diff)
downloadspark-7003c163dbb46bb7313aab130a33486a356435a8.tar.gz
spark-7003c163dbb46bb7313aab130a33486a356435a8.tar.bz2
spark-7003c163dbb46bb7313aab130a33486a356435a8.zip
[SPARK-2179][SQL] Public API for DataTypes and Schema
The current PR contains the following changes: * Expose `DataType`s in the sql package (internal details are private to sql). * Users can create Rows. * Introduce `applySchema` to create a `SchemaRDD` by applying a `schema: StructType` to an `RDD[Row]`. * Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`. * `ScalaReflection.typeOfObject` provides a way to infer the Catalyst data type based on an object. Also, we can compose `typeOfObject` with some custom logics to form a new function to infer the data type (for different use cases). * `JsonRDD` has been refactored to use changes introduced by this PR. * Add a field `containsNull` to `ArrayType`. So, we can explicitly mark if an `ArrayType` can contain null values. The default value of `containsNull` is `false`. New APIs are introduced in the sql package object and SQLContext. You can find the scaladoc at [sql package object](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.package) and [SQLContext](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext). An example of using `applySchema` is shown below. ```scala import org.apache.spark.sql._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, true) :: Nil) val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt)) val peopleSchemaRDD = sqlContext. applySchema(people, schema) peopleSchemaRDD.printSchema // root // |-- name: string (nullable = false) // |-- age: integer (nullable = true) peopleSchemaRDD.registerAsTable("people") sqlContext.sql("select name from people").collect.foreach(println) ``` I will add new contents to the SQL programming guide later. JIRA: https://issues.apache.org/jira/browse/SPARK-2179 Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1346 from yhuai/dataTypeAndSchema and squashes the following commits: 1d45977 [Yin Huai] Clean up. a6e08b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema c712fbf [Yin Huai] Converts types of values based on defined schema. 4ceeb66 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema e5f8df5 [Yin Huai] Scaladoc. 122d1e7 [Yin Huai] Address comments. 03bfd95 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema 2476ed0 [Yin Huai] Minor updates. ab71f21 [Yin Huai] Format. fc2bed1 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema bd40a33 [Yin Huai] Address comments. 991f860 [Yin Huai] Move "asJavaDataType" and "asScalaDataType" to DataTypeConversions.scala. 1cb35fe [Yin Huai] Add "valueContainsNull" to MapType. 3edb3ae [Yin Huai] Python doc. 692c0b9 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema 1d93395 [Yin Huai] Python APIs. 246da96 [Yin Huai] Add java data type APIs to javadoc index. 1db9531 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema d48fc7b [Yin Huai] Minor updates. 33c4fec [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema b9f3071 [Yin Huai] Java API for applySchema. 1c9f33c [Yin Huai] Java APIs for DataTypes and Row. 624765c [Yin Huai] Tests for applySchema. aa92e84 [Yin Huai] Update data type tests. 8da1a17 [Yin Huai] Add Row.fromSeq. 9c99bc0 [Yin Huai] Several minor updates. 1d9c13a [Yin Huai] Update applySchema API. 85e9b51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema e495e4e [Yin Huai] More comments. 42d47a3 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema c3f4a02 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema 2e58dbd [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema b8b7db4 [Yin Huai] 1. Move sql package object and package-info to sql-core. 2. Minor updates on APIs. 3. Update scala doc. 68525a2 [Yin Huai] Update JSON unit test. 3209108 [Yin Huai] Add unit tests. dcaf22f [Yin Huai] Add a field containsNull to ArrayType to indicate if an array can contain null values or not. If an ArrayType is constructed by "ArrayType(elementType)" (the existing constructor), the value of containsNull is false. 9168b83 [Yin Huai] Update comments. fc649d7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema eca7d04 [Yin Huai] Add two apply methods which will be used to extract StructField(s) from a StructType. 949d6bb [Yin Huai] When creating a SchemaRDD for a JSON dataset, users can apply an existing schema. 7a6a7e5 [Yin Huai] Fix bug introduced by the change made on SQLContext.inferSchema. 43a45e1 [Yin Huai] Remove sql.util.package introduced in a previous commit. 0266761 [Yin Huai] Format 03eec4c [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema 90460ac [Yin Huai] Infer the Catalyst data type from an object and cast a data value to the expected type. 3fa0df5 [Yin Huai] Provide easier ways to construct a StructType. 16be3e5 [Yin Huai] This commit contains three changes: * Expose `DataType`s in the sql package (internal details are private to sql). * Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`, * Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala45
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala268
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java21
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala36
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala66
18 files changed, 333 insertions, 186 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 5a55be1e51..0d26b52a84 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -85,6 +85,26 @@ object ScalaReflection {
case t if t <:< definitions.BooleanTpe => Schema(BooleanType, nullable = false)
}
+ def typeOfObject: PartialFunction[Any, DataType] = {
+ // The data type can be determined without ambiguity.
+ case obj: BooleanType.JvmType => BooleanType
+ case obj: BinaryType.JvmType => BinaryType
+ case obj: StringType.JvmType => StringType
+ case obj: ByteType.JvmType => ByteType
+ case obj: ShortType.JvmType => ShortType
+ case obj: IntegerType.JvmType => IntegerType
+ case obj: LongType.JvmType => LongType
+ case obj: FloatType.JvmType => FloatType
+ case obj: DoubleType.JvmType => DoubleType
+ case obj: DecimalType.JvmType => DecimalType
+ case obj: TimestampType.JvmType => TimestampType
+ case null => NullType
+ // For other cases, there is no obvious mapping from the type of the given object to a
+ // Catalyst data type. A user should provide his/her specific rules
+ // (in a user-defined PartialFunction) to infer the Catalyst data type for other types of
+ // objects and then compose the user-defined PartialFunction with this one.
+ }
+
implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index a3ebec8082..f38f99569f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -17,14 +17,11 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees
-import org.apache.spark.sql.Logging
-
/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
* to be retrieved more efficiently. However, since operations like column pruning can change
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 7470cb861b..c9a63e201e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -32,6 +32,16 @@ object Row {
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
+
+ /**
+ * This method can be used to construct a [[Row]] with the given values.
+ */
+ def apply(values: Any*): Row = new GenericRow(values.toArray)
+
+ /**
+ * This method can be used to construct a [[Row]] from a [[Seq]] of values.
+ */
+ def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
index e787c59e75..eb8898900d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
@@ -21,8 +21,16 @@ import scala.language.dynamics
import org.apache.spark.sql.catalyst.types.DataType
-case object DynamicType extends DataType
+/**
+ * The data type representing [[DynamicRow]] values.
+ */
+case object DynamicType extends DataType {
+ def simpleString: String = "dynamic"
+}
+/**
+ * Wrap a [[Row]] as a [[DynamicRow]].
+ */
case class WrapDynamic(children: Seq[Attribute]) extends Expression {
type EvaluatedType = DynamicRow
@@ -37,6 +45,11 @@ case class WrapDynamic(children: Seq[Attribute]) extends Expression {
}
}
+/**
+ * DynamicRows use scala's Dynamic trait to emulate an ORM of in a dynamically typed language.
+ * Since the type of the column is not known at compile time, all attributes are converted to
+ * strings before being passed to the function.
+ */
class DynamicRow(val schema: Seq[Attribute], values: Array[Any])
extends GenericRow(values) with Dynamic {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 0acb29012f..72add5e20e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -31,8 +31,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
override def foldable = child.foldable && ordinal.foldable
override def references = children.flatMap(_.references).toSet
def dataType = child.dataType match {
- case ArrayType(dt) => dt
- case MapType(_, vt) => vt
+ case ArrayType(dt, _) => dt
+ case MapType(_, vt, _) => vt
}
override lazy val resolved =
childrenResolved &&
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index dd78614754..422839dab7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -84,8 +84,8 @@ case class Explode(attributeNames: Seq[String], child: Expression)
(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
private lazy val elementTypes = child.dataType match {
- case ArrayType(et) => et :: Nil
- case MapType(kt,vt) => kt :: vt :: Nil
+ case ArrayType(et, _) => et :: Nil
+ case MapType(kt,vt, _) => kt :: vt :: Nil
}
// TODO: Move this pattern into Generator.
@@ -102,10 +102,10 @@ case class Explode(attributeNames: Seq[String], child: Expression)
override def eval(input: Row): TraversableOnce[Row] = {
child.dataType match {
- case ArrayType(_) =>
+ case ArrayType(_, _) =>
val inputArray = child.eval(input).asInstanceOf[Seq[Any]]
if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v)))
- case MapType(_, _) =>
+ case MapType(_, _, _) =>
val inputMap = child.eval(input).asInstanceOf[Map[Any,Any]]
if (inputMap == null) Nil else inputMap.map { case (k,v) => new GenericRow(Array(k,v)) }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
index 3b3e206055..ca9642954e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
@@ -24,4 +24,6 @@ package object catalyst {
* 2.10.* builds. See SI-6240 for more details.
*/
protected[catalyst] object ScalaReflectionLock
+
+ protected[catalyst] type Logging = com.typesafe.scalalogging.slf4j.Logging
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 67833664b3..781ba489b4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.planning
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 418f8686bf..bc763a4e06 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql.catalyst.planning
import scala.annotation.tailrec
-import org.apache.spark.sql.Logging
-
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 7b82e19b2e..0988b0c6d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -125,51 +125,10 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}.toSeq
}
- protected def generateSchemaString(schema: Seq[Attribute]): String = {
- val builder = new StringBuilder
- builder.append("root\n")
- val prefix = " |"
- schema.foreach { attribute =>
- val name = attribute.name
- val dataType = attribute.dataType
- dataType match {
- case fields: StructType =>
- builder.append(s"$prefix-- $name: $StructType\n")
- generateSchemaString(fields, s"$prefix |", builder)
- case ArrayType(fields: StructType) =>
- builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
- generateSchemaString(fields, s"$prefix |", builder)
- case ArrayType(elementType: DataType) =>
- builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
- case _ => builder.append(s"$prefix-- $name: $dataType\n")
- }
- }
-
- builder.toString()
- }
-
- protected def generateSchemaString(
- schema: StructType,
- prefix: String,
- builder: StringBuilder): StringBuilder = {
- schema.fields.foreach {
- case StructField(name, fields: StructType, _) =>
- builder.append(s"$prefix-- $name: $StructType\n")
- generateSchemaString(fields, s"$prefix |", builder)
- case StructField(name, ArrayType(fields: StructType), _) =>
- builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
- generateSchemaString(fields, s"$prefix |", builder)
- case StructField(name, ArrayType(elementType: DataType), _) =>
- builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
- case StructField(name, fieldType: DataType, _) =>
- builder.append(s"$prefix-- $name: $fieldType\n")
- }
-
- builder
- }
+ def schema: StructType = StructType.fromAttributes(output)
/** Returns the output schema in the tree format. */
- def schemaString: String = generateSchemaString(output)
+ def schemaString: String = schema.treeString
/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 1537de259c..3cb407217c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -177,7 +177,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
case StructType(fields) =>
StructType(fields.map(f =>
StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
- case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
+ case ArrayType(elemType, containsNull) => ArrayType(lowerCaseSchema(elemType), containsNull)
case otherType => otherType
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
index 1076537bc7..f8960b3fe7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.rules
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index e300bdbece..6aa407c836 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.sql
-package catalyst
-package rules
+package org.apache.spark.sql.catalyst.rules
+import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
index d159ecdd5d..9a28d035a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst
-import org.apache.spark.sql.Logger
-
/**
* A library for easily manipulating trees of operators. Operators that extend TreeNode are
* granted the following interface:
@@ -35,5 +33,6 @@ import org.apache.spark.sql.Logger
*/
package object trees {
// Since we want tree nodes to be lightweight, we create one logger for all treenode instances.
- protected val logger = Logger("catalyst.trees")
+ protected val logger =
+ com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger("catalyst.trees"))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 71808f76d6..b52ee6d337 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -45,11 +45,13 @@ object DataType extends RegexParsers {
"TimestampType" ^^^ TimestampType
protected lazy val arrayType: Parser[DataType] =
- "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType
+ "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+ }
protected lazy val mapType: Parser[DataType] =
- "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
- case t1 ~ _ ~ t2 => MapType(t1, t2)
+ "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
}
protected lazy val structField: Parser[StructField] =
@@ -82,6 +84,21 @@ object DataType extends RegexParsers {
case Success(result, _) => result
case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
}
+
+ protected[types] def buildFormattedString(
+ dataType: DataType,
+ prefix: String,
+ builder: StringBuilder): Unit = {
+ dataType match {
+ case array: ArrayType =>
+ array.buildFormattedString(prefix, builder)
+ case struct: StructType =>
+ struct.buildFormattedString(prefix, builder)
+ case map: MapType =>
+ map.buildFormattedString(prefix, builder)
+ case _ =>
+ }
+ }
}
abstract class DataType {
@@ -92,9 +109,13 @@ abstract class DataType {
}
def isPrimitive: Boolean = false
+
+ def simpleString: String
}
-case object NullType extends DataType
+case object NullType extends DataType {
+ def simpleString: String = "null"
+}
object NativeType {
def all = Seq(
@@ -108,40 +129,45 @@ trait PrimitiveType extends DataType {
}
abstract class NativeType extends DataType {
- type JvmType
- @transient val tag: TypeTag[JvmType]
- val ordering: Ordering[JvmType]
+ private[sql] type JvmType
+ @transient private[sql] val tag: TypeTag[JvmType]
+ private[sql] val ordering: Ordering[JvmType]
- @transient val classTag = ScalaReflectionLock.synchronized {
+ @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}
case object StringType extends NativeType with PrimitiveType {
- type JvmType = String
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = String
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "string"
}
case object BinaryType extends DataType with PrimitiveType {
- type JvmType = Array[Byte]
+ private[sql] type JvmType = Array[Byte]
+ def simpleString: String = "binary"
}
case object BooleanType extends NativeType with PrimitiveType {
- type JvmType = Boolean
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Boolean
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "boolean"
}
case object TimestampType extends NativeType {
- type JvmType = Timestamp
+ private[sql] type JvmType = Timestamp
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val ordering = new Ordering[JvmType] {
+ private[sql] val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
}
+
+ def simpleString: String = "timestamp"
}
abstract class NumericType extends NativeType with PrimitiveType {
@@ -150,7 +176,7 @@ abstract class NumericType extends NativeType with PrimitiveType {
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
// desugared by the compiler into an argument to the objects constructor. This means there is no
// longer an no argument constructor and thus the JVM cannot serialize the object anymore.
- val numeric: Numeric[JvmType]
+ private[sql] val numeric: Numeric[JvmType]
}
object NumericType {
@@ -166,39 +192,43 @@ object IntegralType {
}
abstract class IntegralType extends NumericType {
- val integral: Integral[JvmType]
+ private[sql] val integral: Integral[JvmType]
}
case object LongType extends IntegralType {
- type JvmType = Long
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Long]]
- val integral = implicitly[Integral[Long]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Long
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Long]]
+ private[sql] val integral = implicitly[Integral[Long]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "long"
}
case object IntegerType extends IntegralType {
- type JvmType = Int
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Int]]
- val integral = implicitly[Integral[Int]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Int
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Int]]
+ private[sql] val integral = implicitly[Integral[Int]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "integer"
}
case object ShortType extends IntegralType {
- type JvmType = Short
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Short]]
- val integral = implicitly[Integral[Short]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Short
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Short]]
+ private[sql] val integral = implicitly[Integral[Short]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "short"
}
case object ByteType extends IntegralType {
- type JvmType = Byte
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Byte]]
- val integral = implicitly[Integral[Byte]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Byte
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Byte]]
+ private[sql] val integral = implicitly[Integral[Byte]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "byte"
}
/** Matcher for any expressions that evaluate to [[FractionalType]]s */
@@ -209,47 +239,159 @@ object FractionalType {
}
}
abstract class FractionalType extends NumericType {
- val fractional: Fractional[JvmType]
+ private[sql] val fractional: Fractional[JvmType]
}
case object DecimalType extends FractionalType {
- type JvmType = BigDecimal
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[BigDecimal]]
- val fractional = implicitly[Fractional[BigDecimal]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = BigDecimal
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[BigDecimal]]
+ private[sql] val fractional = implicitly[Fractional[BigDecimal]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "decimal"
}
case object DoubleType extends FractionalType {
- type JvmType = Double
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Double]]
- val fractional = implicitly[Fractional[Double]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Double
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Double]]
+ private[sql] val fractional = implicitly[Fractional[Double]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "double"
}
case object FloatType extends FractionalType {
- type JvmType = Float
- @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
- val numeric = implicitly[Numeric[Float]]
- val fractional = implicitly[Fractional[Float]]
- val ordering = implicitly[Ordering[JvmType]]
+ private[sql] type JvmType = Float
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+ private[sql] val numeric = implicitly[Numeric[Float]]
+ private[sql] val fractional = implicitly[Fractional[Float]]
+ private[sql] val ordering = implicitly[Ordering[JvmType]]
+ def simpleString: String = "float"
}
-case class ArrayType(elementType: DataType) extends DataType
+object ArrayType {
+ /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is false. */
+ def apply(elementType: DataType): ArrayType = ArrayType(elementType, false)
+}
-case class StructField(name: String, dataType: DataType, nullable: Boolean)
+/**
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ */
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(
+ s"${prefix}-- element: ${elementType.simpleString} (containsNull = ${containsNull})\n")
+ DataType.buildFormattedString(elementType, s"$prefix |", builder)
+ }
+
+ def simpleString: String = "array"
+}
+
+/**
+ * A field inside a StructType.
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ */
+case class StructField(name: String, dataType: DataType, nullable: Boolean) {
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
+ DataType.buildFormattedString(dataType, s"$prefix |", builder)
+ }
+}
object StructType {
- def fromAttributes(attributes: Seq[Attribute]): StructType = {
+ protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
- }
- // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
+ private def validateFields(fields: Seq[StructField]): Boolean =
+ fields.map(field => field.name).distinct.size == fields.size
}
case class StructType(fields: Seq[StructField]) extends DataType {
- def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+ require(StructType.validateFields(fields), "Found fields with the same name.")
+
+ /**
+ * Returns all field names in a [[Seq]].
+ */
+ lazy val fieldNames: Seq[String] = fields.map(_.name)
+ private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+ private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+ /**
+ * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+ * have a name matching the given name, `null` will be returned.
+ */
+ def apply(name: String): StructField = {
+ nameToField.get(name).getOrElse(
+ throw new IllegalArgumentException(s"Field ${name} does not exist."))
+ }
+
+ /**
+ * Returns a [[StructType]] containing [[StructField]]s of the given names.
+ * Those names which do not have matching fields will be ignored.
+ */
+ def apply(names: Set[String]): StructType = {
+ val nonExistFields = names -- fieldNamesSet
+ if (!nonExistFields.isEmpty) {
+ throw new IllegalArgumentException(
+ s"Field ${nonExistFields.mkString(",")} does not exist.")
+ }
+ // Preserve the original order of fields.
+ StructType(fields.filter(f => names.contains(f.name)))
+ }
+
+ protected[sql] def toAttributes =
+ fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+
+ def treeString: String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+ builder.toString()
+ }
+
+ def printTreeString(): Unit = println(treeString)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+ }
+
+ def simpleString: String = "struct"
+}
+
+object MapType {
+ /**
+ * Construct a [[MapType]] object with the given key type and value type.
+ * The `valueContainsNull` is true.
+ */
+ def apply(keyType: DataType, valueType: DataType): MapType =
+ MapType(keyType: DataType, valueType: DataType, true)
}
-case class MapType(keyType: DataType, valueType: DataType) extends DataType
+/**
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ */
+case class MapType(
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean) extends DataType {
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"${prefix}-- key: ${keyType.simpleString}\n")
+ builder.append(s"${prefix}-- value: ${valueType.simpleString} " +
+ s"(valueContainsNull = ${valueContainsNull})\n")
+ DataType.buildFormattedString(keyType, s"$prefix |", builder)
+ DataType.buildFormattedString(valueType, s"$prefix |", builder)
+ }
+
+ def simpleString: String = "map"
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java
deleted file mode 100644
index 5360361451..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Allows the execution of relational queries, including those expressed in SQL using Spark.
- */
-package org.apache.spark.sql; \ No newline at end of file
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
deleted file mode 100644
index 4589129cd1..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Allows the execution of relational queries, including those expressed in SQL using Spark.
- *
- * Note that this package is located in catalyst instead of in core so that all subprojects can
- * inherit the settings from this package object.
- */
-package object sql {
-
- protected[sql] def Logger(name: String) =
- com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger(name))
-
- protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
-
- type Row = catalyst.expressions.Row
-
- val Row = catalyst.expressions.Row
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index c0438dbe52..e030d6e13d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.catalyst
+import java.math.BigInteger
import java.sql.Timestamp
import org.scalatest.FunSuite
-import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
case class PrimitiveData(
@@ -148,4 +148,68 @@ class ScalaReflectionSuite extends FunSuite {
StructField("_2", StringType, nullable = true))),
nullable = true))
}
+
+ test("get data type of a value") {
+ // BooleanType
+ assert(BooleanType === typeOfObject(true))
+ assert(BooleanType === typeOfObject(false))
+
+ // BinaryType
+ assert(BinaryType === typeOfObject("string".getBytes))
+
+ // StringType
+ assert(StringType === typeOfObject("string"))
+
+ // ByteType
+ assert(ByteType === typeOfObject(127.toByte))
+
+ // ShortType
+ assert(ShortType === typeOfObject(32767.toShort))
+
+ // IntegerType
+ assert(IntegerType === typeOfObject(2147483647))
+
+ // LongType
+ assert(LongType === typeOfObject(9223372036854775807L))
+
+ // FloatType
+ assert(FloatType === typeOfObject(3.4028235E38.toFloat))
+
+ // DoubleType
+ assert(DoubleType === typeOfObject(1.7976931348623157E308))
+
+ // DecimalType
+ assert(DecimalType === typeOfObject(BigDecimal("1.7976931348623157E318")))
+
+ // TimestampType
+ assert(TimestampType === typeOfObject(java.sql.Timestamp.valueOf("2014-7-25 10:26:00")))
+
+ // NullType
+ assert(NullType === typeOfObject(null))
+
+ def typeOfObject1: PartialFunction[Any, DataType] = typeOfObject orElse {
+ case value: java.math.BigInteger => DecimalType
+ case value: java.math.BigDecimal => DecimalType
+ case _ => StringType
+ }
+
+ assert(DecimalType === typeOfObject1(
+ new BigInteger("92233720368547758070")))
+ assert(DecimalType === typeOfObject1(
+ new java.math.BigDecimal("1.7976931348623157E318")))
+ assert(StringType === typeOfObject1(BigInt("92233720368547758070")))
+
+ def typeOfObject2: PartialFunction[Any, DataType] = typeOfObject orElse {
+ case value: java.math.BigInteger => DecimalType
+ }
+
+ intercept[MatchError](typeOfObject2(BigInt("92233720368547758070")))
+
+ def typeOfObject3: PartialFunction[Any, DataType] = typeOfObject orElse {
+ case c: Seq[_] => ArrayType(typeOfObject3(c.head))
+ }
+
+ assert(ArrayType(IntegerType) === typeOfObject3(Seq(1, 2, 3)))
+ assert(ArrayType(ArrayType(IntegerType)) === typeOfObject3(Seq(Seq(1,2,3))))
+ }
}