aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-23 14:48:19 -0700
committerReynold Xin <rxin@databricks.com>2015-04-23 14:48:19 -0700
commit6220d933e5ce4ba890f5d6a50a69b95d319dafb4 (patch)
treefa192aa574f6f025c03fe6add682f90d21f910fb /sql
parent1ed46a60adacb352b385d2331401822a5a2c55c0 (diff)
downloadspark-6220d933e5ce4ba890f5d6a50a69b95d319dafb4.tar.gz
spark-6220d933e5ce4ba890f5d6a50a69b95d319dafb4.tar.bz2
spark-6220d933e5ce4ba890f5d6a50a69b95d319dafb4.zip
[SQL] Break dataTypes.scala into multiple files.
It was over 1000 lines of code, making it harder to find all the types. Only moved code around, and didn't change any. Author: Reynold Xin <rxin@databricks.com> Closes #5670 from rxin/break-types and squashes the following commits: 8c59023 [Reynold Xin] Check in missing files. dcd5193 [Reynold Xin] [SQL] Break dataTypes.scala into multiple files.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala74
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala63
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala51
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala353
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala110
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala53
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala53
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala79
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala39
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala53
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala50
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala263
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala57
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala81
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala1224
20 files changed, 1649 insertions, 1224 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
new file mode 100644
index 0000000000..b116163fac
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+object ArrayType {
+ /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
+ def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * Please use [[DataTypes.createArrayType()]] to create a specific instance.
+ *
+ * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+ * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+ * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, false)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(
+ s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
+ DataType.buildFormattedString(elementType, s"$prefix |", builder)
+ }
+
+ override private[sql] def jsonValue =
+ ("type" -> typeName) ~
+ ("elementType" -> elementType.jsonValue) ~
+ ("containsNull" -> containsNull)
+
+ /**
+ * The default size of a value of the ArrayType is 100 * the default size of the element type.
+ * (We assume that there are 100 elements).
+ */
+ override def defaultSize: Int = 100 * elementType.defaultSize
+
+ override def simpleString: String = s"array<${elementType.simpleString}>"
+
+ private[spark] override def asNullable: ArrayType =
+ ArrayType(elementType.asNullable, containsNull = true)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
new file mode 100644
index 0000000000..a581a9e946
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Array[Byte]` values.
+ * Please use the singleton [[DataTypes.BinaryType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class BinaryType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+
+ private[sql] type InternalType = Array[Byte]
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = new Ordering[InternalType] {
+ def compare(x: Array[Byte], y: Array[Byte]): Int = {
+ for (i <- 0 until x.length; if i < y.length) {
+ val res = x(i).compareTo(y(i))
+ if (res != 0) return res
+ }
+ x.length - y.length
+ }
+ }
+
+ /**
+ * The default size of a value of the BinaryType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ private[spark] override def asNullable: BinaryType = this
+}
+
+
+case object BinaryType extends BinaryType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
new file mode 100644
index 0000000000..a7f228cefa
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Boolean` values. Please use the singleton [[DataTypes.BooleanType]].
+ *
+ *@group dataType
+ */
+@DeveloperApi
+class BooleanType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Boolean
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the BooleanType is 1 byte.
+ */
+ override def defaultSize: Int = 1
+
+ private[spark] override def asNullable: BooleanType = this
+}
+
+
+case object BooleanType extends BooleanType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
new file mode 100644
index 0000000000..4d8685796e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Byte` values. Please use the singleton [[DataTypes.ByteType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ByteType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "ByteType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Byte
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Byte]]
+ private[sql] val integral = implicitly[Integral[Byte]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the ByteType is 1 byte.
+ */
+ override def defaultSize: Int = 1
+
+ override def simpleString: String = "tinyint"
+
+ private[spark] override def asNullable: ByteType = this
+}
+
+case object ByteType extends ByteType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
new file mode 100644
index 0000000000..e6bfcd9adf
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{TypeTag, runtimeMirror}
+import scala.util.parsing.combinator.RegexParsers
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.util.Utils
+
+
+/**
+ * :: DeveloperApi ::
+ * The base type of all Spark SQL data types.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+abstract class DataType {
+ /** Matches any expression that evaluates to this DataType */
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType == this => true
+ case _ => false
+ }
+
+ /** The default size of a value of this data type. */
+ def defaultSize: Int
+
+ def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
+
+ private[sql] def jsonValue: JValue = typeName
+
+ def json: String = compact(render(jsonValue))
+
+ def prettyJson: String = pretty(render(jsonValue))
+
+ def simpleString: String = typeName
+
+ /** Check if `this` and `other` are the same data type when ignoring nullability
+ * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+ */
+ private[spark] def sameType(other: DataType): Boolean =
+ DataType.equalsIgnoreNullability(this, other)
+
+ /** Returns the same data type but set all nullability fields are true
+ * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+ */
+ private[spark] def asNullable: DataType
+}
+
+
+/**
+ * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
+ */
+protected[sql] abstract class AtomicType extends DataType {
+ private[sql] type InternalType
+ @transient private[sql] val tag: TypeTag[InternalType]
+ private[sql] val ordering: Ordering[InternalType]
+
+ @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
+ val mirror = runtimeMirror(Utils.getSparkClassLoader)
+ ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
+ }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Numeric data types.
+ *
+ * @group dataType
+ */
+abstract class NumericType extends AtomicType {
+ // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+ // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+ // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+ // desugared by the compiler into an argument to the objects constructor. This means there is no
+ // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+ private[sql] val numeric: Numeric[InternalType]
+}
+
+
+private[sql] object NumericType {
+ def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
+}
+
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+private[sql] object IntegralType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+ case _ => false
+ }
+}
+
+
+private[sql] abstract class IntegralType extends NumericType {
+ private[sql] val integral: Integral[InternalType]
+}
+
+
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+private[sql] object FractionalType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+ case _ => false
+ }
+}
+
+
+private[sql] abstract class FractionalType extends NumericType {
+ private[sql] val fractional: Fractional[InternalType]
+ private[sql] val asIntegral: Integral[InternalType]
+}
+
+
+object DataType {
+
+ def fromJson(json: String): DataType = parseDataType(parse(json))
+
+ @deprecated("Use DataType.fromJson instead", "1.2.0")
+ def fromCaseClassString(string: String): DataType = CaseClassStringParser(string)
+
+ private val nonDecimalNameToType = {
+ Seq(NullType, DateType, TimestampType, BinaryType,
+ IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
+ .map(t => t.typeName -> t).toMap
+ }
+
+ /** Given the string representation of a type, return its DataType */
+ private def nameToType(name: String): DataType = {
+ val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)""".r
+ name match {
+ case "decimal" => DecimalType.Unlimited
+ case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
+ case other => nonDecimalNameToType(other)
+ }
+ }
+
+ private object JSortedObject {
+ def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match {
+ case JObject(seq) => Some(seq.toList.sortBy(_._1))
+ case _ => None
+ }
+ }
+
+ // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side.
+ private def parseDataType(json: JValue): DataType = json match {
+ case JString(name) =>
+ nameToType(name)
+
+ case JSortedObject(
+ ("containsNull", JBool(n)),
+ ("elementType", t: JValue),
+ ("type", JString("array"))) =>
+ ArrayType(parseDataType(t), n)
+
+ case JSortedObject(
+ ("keyType", k: JValue),
+ ("type", JString("map")),
+ ("valueContainsNull", JBool(n)),
+ ("valueType", v: JValue)) =>
+ MapType(parseDataType(k), parseDataType(v), n)
+
+ case JSortedObject(
+ ("fields", JArray(fields)),
+ ("type", JString("struct"))) =>
+ StructType(fields.map(parseStructField))
+
+ case JSortedObject(
+ ("class", JString(udtClass)),
+ ("pyClass", _),
+ ("sqlType", _),
+ ("type", JString("udt"))) =>
+ Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
+ }
+
+ private def parseStructField(json: JValue): StructField = json match {
+ case JSortedObject(
+ ("metadata", metadata: JObject),
+ ("name", JString(name)),
+ ("nullable", JBool(nullable)),
+ ("type", dataType: JValue)) =>
+ StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
+ // Support reading schema when 'metadata' is missing.
+ case JSortedObject(
+ ("name", JString(name)),
+ ("nullable", JBool(nullable)),
+ ("type", dataType: JValue)) =>
+ StructField(name, parseDataType(dataType), nullable)
+ }
+
+ private object CaseClassStringParser extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ ( "StringType" ^^^ StringType
+ | "FloatType" ^^^ FloatType
+ | "IntegerType" ^^^ IntegerType
+ | "ByteType" ^^^ ByteType
+ | "ShortType" ^^^ ShortType
+ | "DoubleType" ^^^ DoubleType
+ | "LongType" ^^^ LongType
+ | "BinaryType" ^^^ BinaryType
+ | "BooleanType" ^^^ BooleanType
+ | "DateType" ^^^ DateType
+ | "DecimalType()" ^^^ DecimalType.Unlimited
+ | fixedDecimalType
+ | "TimestampType" ^^^ TimestampType
+ )
+
+ protected lazy val fixedDecimalType: Parser[DataType] =
+ ("DecimalType(" ~> "[0-9]+".r) ~ ("," ~> "[0-9]+".r <~ ")") ^^ {
+ case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
+ }
+
+ protected lazy val arrayType: Parser[DataType] =
+ "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+ }
+
+ protected lazy val mapType: Parser[DataType] =
+ "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+ case name ~ tpe ~ nullable =>
+ StructField(name, tpe, nullable = nullable)
+ }
+
+ protected lazy val boolVal: Parser[Boolean] =
+ ( "true" ^^^ true
+ | "false" ^^^ false
+ )
+
+ protected lazy val structType: Parser[DataType] =
+ "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+ case fields => StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ ( arrayType
+ | mapType
+ | structType
+ | primitiveType
+ )
+
+ /**
+ * Parses a string representation of a DataType.
+ *
+ * TODO: Generate parser as pickler...
+ */
+ def apply(asString: String): DataType = parseAll(dataType, asString) match {
+ case Success(result, _) => result
+ case failure: NoSuccess =>
+ throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
+ }
+ }
+
+ protected[types] def buildFormattedString(
+ dataType: DataType,
+ prefix: String,
+ builder: StringBuilder): Unit = {
+ dataType match {
+ case array: ArrayType =>
+ array.buildFormattedString(prefix, builder)
+ case struct: StructType =>
+ struct.buildFormattedString(prefix, builder)
+ case map: MapType =>
+ map.buildFormattedString(prefix, builder)
+ case _ =>
+ }
+ }
+
+ /**
+ * Compares two types, ignoring nullability of ArrayType, MapType, StructType.
+ */
+ private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+ (left, right) match {
+ case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
+ equalsIgnoreNullability(leftElementType, rightElementType)
+ case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) =>
+ equalsIgnoreNullability(leftKeyType, rightKeyType) &&
+ equalsIgnoreNullability(leftValueType, rightValueType)
+ case (StructType(leftFields), StructType(rightFields)) =>
+ leftFields.length == rightFields.length &&
+ leftFields.zip(rightFields).forall { case (l, r) =>
+ l.name == r.name && equalsIgnoreNullability(l.dataType, r.dataType)
+ }
+ case (l, r) => l == r
+ }
+ }
+
+ /**
+ * Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
+ *
+ * Compatible nullability is defined as follows:
+ * - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
+ * if and only if `to.containsNull` is true, or both of `from.containsNull` and
+ * `to.containsNull` are false.
+ * - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
+ * if and only if `to.valueContainsNull` is true, or both of `from.valueContainsNull` and
+ * `to.valueContainsNull` are false.
+ * - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
+ * if and only if for all every pair of fields, `to.nullable` is true, or both
+ * of `fromField.nullable` and `toField.nullable` are false.
+ */
+ private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
+ (from, to) match {
+ case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
+ (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)
+
+ case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
+ (tn || !fn) &&
+ equalsIgnoreCompatibleNullability(fromKey, toKey) &&
+ equalsIgnoreCompatibleNullability(fromValue, toValue)
+
+ case (StructType(fromFields), StructType(toFields)) =>
+ fromFields.length == toFields.length &&
+ fromFields.zip(toFields).forall { case (fromField, toField) =>
+ fromField.name == toField.name &&
+ (toField.nullable || !fromField.nullable) &&
+ equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
+ }
+
+ case (fromDataType, toDataType) => fromDataType == toDataType
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
new file mode 100644
index 0000000000..03f0644bc7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Date` values.
+ * Please use the singleton [[DataTypes.DateType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DateType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "DateType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Int
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the DateType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ private[spark] override def asNullable: DateType = this
+}
+
+
+case object DateType extends DateType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
new file mode 100644
index 0000000000..0f8cecd28f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+
+/** Precision parameters for a Decimal */
+case class PrecisionInfo(precision: Int, scale: Int)
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.math.BigDecimal` values.
+ * A Decimal that might have fixed precision and scale, or unlimited values for these.
+ *
+ * Please use [[DataTypes.createDecimalType()]] to create a specific instance.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
+ private[sql] type InternalType = Decimal
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = Decimal.DecimalIsFractional
+ private[sql] val fractional = Decimal.DecimalIsFractional
+ private[sql] val ordering = Decimal.DecimalIsFractional
+ private[sql] val asIntegral = Decimal.DecimalAsIfIntegral
+
+ def precision: Int = precisionInfo.map(_.precision).getOrElse(-1)
+
+ def scale: Int = precisionInfo.map(_.scale).getOrElse(-1)
+
+ override def typeName: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+ case None => "decimal"
+ }
+
+ override def toString: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
+ case None => "DecimalType()"
+ }
+
+ /**
+ * The default size of a value of the DecimalType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ override def simpleString: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+ case None => "decimal(10,0)"
+ }
+
+ private[spark] override def asNullable: DecimalType = this
+}
+
+
+/** Extra factory methods and pattern matchers for Decimals */
+object DecimalType {
+ val Unlimited: DecimalType = DecimalType(None)
+
+ object Fixed {
+ def unapply(t: DecimalType): Option[(Int, Int)] =
+ t.precisionInfo.map(p => (p.precision, p.scale))
+ }
+
+ object Expression {
+ def unapply(e: Expression): Option[(Int, Int)] = e.dataType match {
+ case t: DecimalType => t.precisionInfo.map(p => (p.precision, p.scale))
+ case _ => None
+ }
+ }
+
+ def apply(): DecimalType = Unlimited
+
+ def apply(precision: Int, scale: Int): DecimalType =
+ DecimalType(Some(PrecisionInfo(precision, scale)))
+
+ def unapply(t: DataType): Boolean = t.isInstanceOf[DecimalType]
+
+ def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[DecimalType]
+
+ def isFixed(dataType: DataType): Boolean = dataType match {
+ case DecimalType.Fixed(_, _) => true
+ case _ => false
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
new file mode 100644
index 0000000000..6676662321
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.math.Numeric.DoubleAsIfIntegral
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Double` values. Please use the singleton [[DataTypes.DoubleType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DoubleType private() extends FractionalType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "DoubleType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Double
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Double]]
+ private[sql] val fractional = implicitly[Fractional[Double]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+ private[sql] val asIntegral = DoubleAsIfIntegral
+
+ /**
+ * The default size of a value of the DoubleType is 8 bytes.
+ */
+ override def defaultSize: Int = 8
+
+ private[spark] override def asNullable: DoubleType = this
+}
+
+case object DoubleType extends DoubleType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
new file mode 100644
index 0000000000..1d5a2f4f6f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Numeric.FloatAsIfIntegral
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Float` values. Please use the singleton [[DataTypes.FloatType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class FloatType private() extends FractionalType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "FloatType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Float
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Float]]
+ private[sql] val fractional = implicitly[Fractional[Float]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+ private[sql] val asIntegral = FloatAsIfIntegral
+
+ /**
+ * The default size of a value of the FloatType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ private[spark] override def asNullable: FloatType = this
+}
+
+case object FloatType extends FloatType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
new file mode 100644
index 0000000000..74e464c082
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Int` values. Please use the singleton [[DataTypes.IntegerType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class IntegerType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "IntegerType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Int
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Int]]
+ private[sql] val integral = implicitly[Integral[Int]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the IntegerType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ override def simpleString: String = "int"
+
+ private[spark] override def asNullable: IntegerType = this
+}
+
+case object IntegerType extends IntegerType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
new file mode 100644
index 0000000000..390675782e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Long` values. Please use the singleton [[DataTypes.LongType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class LongType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "LongType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Long
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Long]]
+ private[sql] val integral = implicitly[Integral[Long]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the LongType is 8 bytes.
+ */
+ override def defaultSize: Int = 8
+
+ override def simpleString: String = "bigint"
+
+ private[spark] override def asNullable: LongType = this
+}
+
+
+case object LongType extends LongType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
new file mode 100644
index 0000000000..cfdf493074
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ *
+ * Please use [[DataTypes.createMapType()]] to create a specific instance.
+ *
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ *
+ * @group dataType
+ */
+case class MapType(
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ def this() = this(null, null, false)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"$prefix-- key: ${keyType.typeName}\n")
+ builder.append(s"$prefix-- value: ${valueType.typeName} " +
+ s"(valueContainsNull = $valueContainsNull)\n")
+ DataType.buildFormattedString(keyType, s"$prefix |", builder)
+ DataType.buildFormattedString(valueType, s"$prefix |", builder)
+ }
+
+ override private[sql] def jsonValue: JValue =
+ ("type" -> typeName) ~
+ ("keyType" -> keyType.jsonValue) ~
+ ("valueType" -> valueType.jsonValue) ~
+ ("valueContainsNull" -> valueContainsNull)
+
+ /**
+ * The default size of a value of the MapType is
+ * 100 * (the default size of the key type + the default size of the value type).
+ * (We assume that there are 100 elements).
+ */
+ override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
+
+ override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
+
+ private[spark] override def asNullable: MapType =
+ MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
+}
+
+
+object MapType {
+ /**
+ * Construct a [[MapType]] object with the given key type and value type.
+ * The `valueContainsNull` is true.
+ */
+ def apply(keyType: DataType, valueType: DataType): MapType =
+ MapType(keyType: DataType, valueType: DataType, valueContainsNull = true)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
new file mode 100644
index 0000000000..b64b07431f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `NULL` values. Please use the singleton [[DataTypes.NullType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class NullType private() extends DataType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "NullType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ override def defaultSize: Int = 1
+
+ private[spark] override def asNullable: NullType = this
+}
+
+case object NullType extends NullType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
new file mode 100644
index 0000000000..73e9ec780b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Short` values. Please use the singleton [[DataTypes.ShortType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ShortType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "ShortType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Short
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Short]]
+ private[sql] val integral = implicitly[Integral[Short]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the ShortType is 2 bytes.
+ */
+ override def defaultSize: Int = 2
+
+ override def simpleString: String = "smallint"
+
+ private[spark] override def asNullable: ShortType = this
+}
+
+case object ShortType extends ShortType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
new file mode 100644
index 0000000000..134ab0af4e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `String` values. Please use the singleton [[DataTypes.StringType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class StringType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "StringType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = UTF8String
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the StringType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ private[spark] override def asNullable: StringType = this
+}
+
+case object StringType extends StringType
+
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
new file mode 100644
index 0000000000..83570a5eae
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+/**
+ * A field inside a StructType.
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ * @param metadata The metadata of this field. The metadata should be preserved during
+ * transformation if the content of the column is not modified, e.g, in selection.
+ */
+case class StructField(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean = true,
+ metadata: Metadata = Metadata.empty) {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, null)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
+ DataType.buildFormattedString(dataType, s"$prefix |", builder)
+ }
+
+ // override the default toString to be compatible with legacy parquet files.
+ override def toString: String = s"StructField($name,$dataType,$nullable)"
+
+ private[sql] def jsonValue: JValue = {
+ ("name" -> name) ~
+ ("type" -> dataType.jsonValue) ~
+ ("nullable" -> nullable) ~
+ ("metadata" -> metadata.jsonValue)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
new file mode 100644
index 0000000000..d80ffca18e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.collection.mutable.ArrayBuffer
+import scala.math.max
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+
+
+/**
+ * :: DeveloperApi ::
+ * A [[StructType]] object can be constructed by
+ * {{{
+ * StructType(fields: Seq[StructField])
+ * }}}
+ * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+ * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+ * If a provided name does not have a matching field, it will be ignored. For the case
+ * of extracting a single StructField, a `null` will be returned.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val struct =
+ * StructType(
+ * StructField("a", IntegerType, true) ::
+ * StructField("b", LongType, false) ::
+ * StructField("c", BooleanType, false) :: Nil)
+ *
+ * // Extract a single StructField.
+ * val singleField = struct("b")
+ * // singleField: StructField = StructField(b,LongType,false)
+ *
+ * // This struct does not have a field called "d". null will be returned.
+ * val nonExisting = struct("d")
+ * // nonExisting: StructField = null
+ *
+ * // Extract multiple StructFields. Field names are provided in a set.
+ * // A StructType object will be returned.
+ * val twoFields = struct(Set("b", "c"))
+ * // twoFields: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ *
+ * // Any names without matching fields will be ignored.
+ * // For the case shown below, "d" will be ignored and
+ * // it is treated as struct(Set("b", "c")).
+ * val ignoreNonExisting = struct(Set("b", "c", "d"))
+ * // ignoreNonExisting: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ * }}}
+ *
+ * A [[org.apache.spark.sql.Row]] object is used as a value of the StructType.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val innerStruct =
+ * StructType(
+ * StructField("f1", IntegerType, true) ::
+ * StructField("f2", LongType, false) ::
+ * StructField("f3", BooleanType, false) :: Nil)
+ *
+ * val struct = StructType(
+ * StructField("a", innerStruct, true) :: Nil)
+ *
+ * // Create a Row with the schema defined by struct
+ * val row = Row(Row(1, 2, true))
+ * // row: Row = [[1,2,true]]
+ * }}}
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
+ /** Returns all field names in an array. */
+ def fieldNames: Array[String] = fields.map(_.name)
+
+ private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+ private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+ private lazy val nameToIndex: Map[String, Int] = fieldNames.zipWithIndex.toMap
+
+ /**
+ * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+ * have a name matching the given name, `null` will be returned.
+ */
+ def apply(name: String): StructField = {
+ nameToField.getOrElse(name,
+ throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+ }
+
+ /**
+ * Returns a [[StructType]] containing [[StructField]]s of the given names, preserving the
+ * original order of fields. Those names which do not have matching fields will be ignored.
+ */
+ def apply(names: Set[String]): StructType = {
+ val nonExistFields = names -- fieldNamesSet
+ if (nonExistFields.nonEmpty) {
+ throw new IllegalArgumentException(
+ s"Field ${nonExistFields.mkString(",")} does not exist.")
+ }
+ // Preserve the original order of fields.
+ StructType(fields.filter(f => names.contains(f.name)))
+ }
+
+ /**
+ * Returns index of a given field
+ */
+ def fieldIndex(name: String): Int = {
+ nameToIndex.getOrElse(name,
+ throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+ }
+
+ protected[sql] def toAttributes: Seq[AttributeReference] =
+ map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+ def treeString: String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+ builder.toString()
+ }
+
+ def printTreeString(): Unit = println(treeString)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+ }
+
+ override private[sql] def jsonValue =
+ ("type" -> typeName) ~
+ ("fields" -> map(_.jsonValue))
+
+ override def apply(fieldIndex: Int): StructField = fields(fieldIndex)
+
+ override def length: Int = fields.length
+
+ override def iterator: Iterator[StructField] = fields.iterator
+
+ /**
+ * The default size of a value of the StructType is the total default sizes of all field types.
+ */
+ override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
+
+ override def simpleString: String = {
+ val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
+ s"struct<${fieldTypes.mkString(",")}>"
+ }
+
+ /**
+ * Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
+ * B from `that`,
+ *
+ * 1. If A and B have the same name and data type, they are merged to a field C with the same name
+ * and data type. C is nullable if and only if either A or B is nullable.
+ * 2. If A doesn't exist in `that`, it's included in the result schema.
+ * 3. If B doesn't exist in `this`, it's also included in the result schema.
+ * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
+ * thrown.
+ */
+ private[sql] def merge(that: StructType): StructType =
+ StructType.merge(this, that).asInstanceOf[StructType]
+
+ private[spark] override def asNullable: StructType = {
+ val newFields = fields.map {
+ case StructField(name, dataType, nullable, metadata) =>
+ StructField(name, dataType.asNullable, nullable = true, metadata)
+ }
+
+ StructType(newFields)
+ }
+}
+
+
+object StructType {
+
+ def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
+
+ def apply(fields: java.util.List[StructField]): StructType = {
+ StructType(fields.toArray.asInstanceOf[Array[StructField]])
+ }
+
+ protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+ StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+ private[sql] def merge(left: DataType, right: DataType): DataType =
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
+ leftContainsNull || rightContainsNull)
+
+ case (MapType(leftKeyType, leftValueType, leftContainsNull),
+ MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+ MapType(
+ merge(leftKeyType, rightKeyType),
+ merge(leftValueType, rightValueType),
+ leftContainsNull || rightContainsNull)
+
+ case (StructType(leftFields), StructType(rightFields)) =>
+ val newFields = ArrayBuffer.empty[StructField]
+
+ leftFields.foreach {
+ case leftField @ StructField(leftName, leftType, leftNullable, _) =>
+ rightFields
+ .find(_.name == leftName)
+ .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
+ leftField.copy(
+ dataType = merge(leftType, rightType),
+ nullable = leftNullable || rightNullable)
+ }
+ .orElse(Some(leftField))
+ .foreach(newFields += _)
+ }
+
+ rightFields
+ .filterNot(f => leftFields.map(_.name).contains(f.name))
+ .foreach(newFields += _)
+
+ StructType(newFields)
+
+ case (DecimalType.Fixed(leftPrecision, leftScale),
+ DecimalType.Fixed(rightPrecision, rightScale)) =>
+ DecimalType(
+ max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale),
+ max(leftScale, rightScale))
+
+ case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
+ if leftUdt.userClass == rightUdt.userClass => leftUdt
+
+ case (leftType, rightType) if leftType == rightType =>
+ leftType
+
+ case _ =>
+ throw new SparkException(s"Failed to merge incompatible data types $left and $right")
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
new file mode 100644
index 0000000000..aebabfc475
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.sql.Timestamp
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Timestamp` values.
+ * Please use the singleton [[DataTypes.TimestampType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class TimestampType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Timestamp
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = new Ordering[InternalType] {
+ def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
+ }
+
+ /**
+ * The default size of a value of the TimestampType is 12 bytes.
+ */
+ override def defaultSize: Int = 12
+
+ private[spark] override def asNullable: TimestampType = this
+}
+
+case object TimestampType extends TimestampType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
new file mode 100644
index 0000000000..6b20505c60
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * ::DeveloperApi::
+ * The data type for User Defined Types (UDTs).
+ *
+ * This interface allows a user to make their own classes more interoperable with SparkSQL;
+ * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
+ * a `DataFrame` which has class X in the schema.
+ *
+ * For SparkSQL to recognize UDTs, the UDT must be annotated with
+ * [[SQLUserDefinedType]].
+ *
+ * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD.
+ * The conversion via `deserialize` occurs when reading from a `DataFrame`.
+ */
+@DeveloperApi
+abstract class UserDefinedType[UserType] extends DataType with Serializable {
+
+ /** Underlying storage type for this UDT */
+ def sqlType: DataType
+
+ /** Paired Python UDT class, if exists. */
+ def pyUDT: String = null
+
+ /**
+ * Convert the user type to a SQL datum
+ *
+ * TODO: Can we make this take obj: UserType? The issue is in
+ * CatalystTypeConverters.convertToCatalyst, where we need to convert Any to UserType.
+ */
+ def serialize(obj: Any): Any
+
+ /** Convert a SQL datum to the user type */
+ def deserialize(datum: Any): UserType
+
+ override private[sql] def jsonValue: JValue = {
+ ("type" -> "udt") ~
+ ("class" -> this.getClass.getName) ~
+ ("pyClass" -> pyUDT) ~
+ ("sqlType" -> sqlType.jsonValue)
+ }
+
+ /**
+ * Class object for the UserType
+ */
+ def userClass: java.lang.Class[UserType]
+
+ /**
+ * The default size of a value of the UserDefinedType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ /**
+ * For UDT, asNullable will not change the nullability of its internal sqlType and just returns
+ * itself.
+ */
+ private[spark] override def asNullable: UserDefinedType[UserType] = this
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
deleted file mode 100644
index 87c7b75993..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ /dev/null
@@ -1,1224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.types
-
-import java.sql.Timestamp
-
-import scala.collection.mutable.ArrayBuffer
-import scala.math._
-import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
-import scala.util.parsing.combinator.RegexParsers
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.SparkException
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.ScalaReflectionLock
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
-import org.apache.spark.util.Utils
-
-
-object DataType {
- def fromJson(json: String): DataType = parseDataType(parse(json))
-
- private val nonDecimalNameToType = {
- Seq(NullType, DateType, TimestampType, BinaryType,
- IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
- .map(t => t.typeName -> t).toMap
- }
-
- /** Given the string representation of a type, return its DataType */
- private def nameToType(name: String): DataType = {
- val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)""".r
- name match {
- case "decimal" => DecimalType.Unlimited
- case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
- case other => nonDecimalNameToType(other)
- }
- }
-
- private object JSortedObject {
- def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match {
- case JObject(seq) => Some(seq.toList.sortBy(_._1))
- case _ => None
- }
- }
-
- // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side.
- private def parseDataType(json: JValue): DataType = json match {
- case JString(name) =>
- nameToType(name)
-
- case JSortedObject(
- ("containsNull", JBool(n)),
- ("elementType", t: JValue),
- ("type", JString("array"))) =>
- ArrayType(parseDataType(t), n)
-
- case JSortedObject(
- ("keyType", k: JValue),
- ("type", JString("map")),
- ("valueContainsNull", JBool(n)),
- ("valueType", v: JValue)) =>
- MapType(parseDataType(k), parseDataType(v), n)
-
- case JSortedObject(
- ("fields", JArray(fields)),
- ("type", JString("struct"))) =>
- StructType(fields.map(parseStructField))
-
- case JSortedObject(
- ("class", JString(udtClass)),
- ("pyClass", _),
- ("sqlType", _),
- ("type", JString("udt"))) =>
- Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
- }
-
- private def parseStructField(json: JValue): StructField = json match {
- case JSortedObject(
- ("metadata", metadata: JObject),
- ("name", JString(name)),
- ("nullable", JBool(nullable)),
- ("type", dataType: JValue)) =>
- StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
- // Support reading schema when 'metadata' is missing.
- case JSortedObject(
- ("name", JString(name)),
- ("nullable", JBool(nullable)),
- ("type", dataType: JValue)) =>
- StructField(name, parseDataType(dataType), nullable)
- }
-
- @deprecated("Use DataType.fromJson instead", "1.2.0")
- def fromCaseClassString(string: String): DataType = CaseClassStringParser(string)
-
- private object CaseClassStringParser extends RegexParsers {
- protected lazy val primitiveType: Parser[DataType] =
- ( "StringType" ^^^ StringType
- | "FloatType" ^^^ FloatType
- | "IntegerType" ^^^ IntegerType
- | "ByteType" ^^^ ByteType
- | "ShortType" ^^^ ShortType
- | "DoubleType" ^^^ DoubleType
- | "LongType" ^^^ LongType
- | "BinaryType" ^^^ BinaryType
- | "BooleanType" ^^^ BooleanType
- | "DateType" ^^^ DateType
- | "DecimalType()" ^^^ DecimalType.Unlimited
- | fixedDecimalType
- | "TimestampType" ^^^ TimestampType
- )
-
- protected lazy val fixedDecimalType: Parser[DataType] =
- ("DecimalType(" ~> "[0-9]+".r) ~ ("," ~> "[0-9]+".r <~ ")") ^^ {
- case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
- }
-
- protected lazy val arrayType: Parser[DataType] =
- "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
- case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
- }
-
- protected lazy val mapType: Parser[DataType] =
- "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
- case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
- }
-
- protected lazy val structField: Parser[StructField] =
- ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
- case name ~ tpe ~ nullable =>
- StructField(name, tpe, nullable = nullable)
- }
-
- protected lazy val boolVal: Parser[Boolean] =
- ( "true" ^^^ true
- | "false" ^^^ false
- )
-
- protected lazy val structType: Parser[DataType] =
- "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
- case fields => StructType(fields)
- }
-
- protected lazy val dataType: Parser[DataType] =
- ( arrayType
- | mapType
- | structType
- | primitiveType
- )
-
- /**
- * Parses a string representation of a DataType.
- *
- * TODO: Generate parser as pickler...
- */
- def apply(asString: String): DataType = parseAll(dataType, asString) match {
- case Success(result, _) => result
- case failure: NoSuccess =>
- throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
- }
- }
-
- protected[types] def buildFormattedString(
- dataType: DataType,
- prefix: String,
- builder: StringBuilder): Unit = {
- dataType match {
- case array: ArrayType =>
- array.buildFormattedString(prefix, builder)
- case struct: StructType =>
- struct.buildFormattedString(prefix, builder)
- case map: MapType =>
- map.buildFormattedString(prefix, builder)
- case _ =>
- }
- }
-
- /**
- * Compares two types, ignoring nullability of ArrayType, MapType, StructType.
- */
- private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
- (left, right) match {
- case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
- equalsIgnoreNullability(leftElementType, rightElementType)
- case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) =>
- equalsIgnoreNullability(leftKeyType, rightKeyType) &&
- equalsIgnoreNullability(leftValueType, rightValueType)
- case (StructType(leftFields), StructType(rightFields)) =>
- leftFields.length == rightFields.length &&
- leftFields.zip(rightFields).forall { case (l, r) =>
- l.name == r.name && equalsIgnoreNullability(l.dataType, r.dataType)
- }
- case (l, r) => l == r
- }
- }
-
- /**
- * Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
- *
- * Compatible nullability is defined as follows:
- * - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
- * if and only if `to.containsNull` is true, or both of `from.containsNull` and
- * `to.containsNull` are false.
- * - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
- * if and only if `to.valueContainsNull` is true, or both of `from.valueContainsNull` and
- * `to.valueContainsNull` are false.
- * - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
- * if and only if for all every pair of fields, `to.nullable` is true, or both
- * of `fromField.nullable` and `toField.nullable` are false.
- */
- private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
- (from, to) match {
- case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
- (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)
-
- case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
- (tn || !fn) &&
- equalsIgnoreCompatibleNullability(fromKey, toKey) &&
- equalsIgnoreCompatibleNullability(fromValue, toValue)
-
- case (StructType(fromFields), StructType(toFields)) =>
- fromFields.length == toFields.length &&
- fromFields.zip(toFields).forall { case (fromField, toField) =>
- fromField.name == toField.name &&
- (toField.nullable || !fromField.nullable) &&
- equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
- }
-
- case (fromDataType, toDataType) => fromDataType == toDataType
- }
- }
-}
-
-
-/**
- * :: DeveloperApi ::
- * The base type of all Spark SQL data types.
- *
- * @group dataType
- */
-@DeveloperApi
-abstract class DataType {
- /** Matches any expression that evaluates to this DataType */
- def unapply(a: Expression): Boolean = a match {
- case e: Expression if e.dataType == this => true
- case _ => false
- }
-
- /** The default size of a value of this data type. */
- def defaultSize: Int
-
- def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
-
- private[sql] def jsonValue: JValue = typeName
-
- def json: String = compact(render(jsonValue))
-
- def prettyJson: String = pretty(render(jsonValue))
-
- def simpleString: String = typeName
-
- /** Check if `this` and `other` are the same data type when ignoring nullability
- * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
- */
- private[spark] def sameType(other: DataType): Boolean =
- DataType.equalsIgnoreNullability(this, other)
-
- /** Returns the same data type but set all nullability fields are true
- * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
- */
- private[spark] def asNullable: DataType
-}
-
-/**
- * :: DeveloperApi ::
- * The data type representing `NULL` values. Please use the singleton [[DataTypes.NullType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class NullType private() extends DataType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "NullType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- override def defaultSize: Int = 1
-
- private[spark] override def asNullable: NullType = this
-}
-
-case object NullType extends NullType
-
-
-/**
- * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
- */
-protected[sql] abstract class AtomicType extends DataType {
- private[sql] type InternalType
- @transient private[sql] val tag: TypeTag[InternalType]
- private[sql] val ordering: Ordering[InternalType]
-
- @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
- val mirror = runtimeMirror(Utils.getSparkClassLoader)
- ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
- }
-}
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `String` values. Please use the singleton [[DataTypes.StringType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class StringType private() extends AtomicType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "StringType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = UTF8String
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the StringType is 4096 bytes.
- */
- override def defaultSize: Int = 4096
-
- private[spark] override def asNullable: StringType = this
-}
-
-case object StringType extends StringType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Array[Byte]` values.
- * Please use the singleton [[DataTypes.BinaryType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class BinaryType private() extends AtomicType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Array[Byte]
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val ordering = new Ordering[InternalType] {
- def compare(x: Array[Byte], y: Array[Byte]): Int = {
- for (i <- 0 until x.length; if i < y.length) {
- val res = x(i).compareTo(y(i))
- if (res != 0) return res
- }
- x.length - y.length
- }
- }
-
- /**
- * The default size of a value of the BinaryType is 4096 bytes.
- */
- override def defaultSize: Int = 4096
-
- private[spark] override def asNullable: BinaryType = this
-}
-
-case object BinaryType extends BinaryType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Boolean` values. Please use the singleton [[DataTypes.BooleanType]].
- *
- *@group dataType
- */
-@DeveloperApi
-class BooleanType private() extends AtomicType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Boolean
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the BooleanType is 1 byte.
- */
- override def defaultSize: Int = 1
-
- private[spark] override def asNullable: BooleanType = this
-}
-
-case object BooleanType extends BooleanType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `java.sql.Timestamp` values.
- * Please use the singleton [[DataTypes.TimestampType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class TimestampType private() extends AtomicType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Timestamp
-
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
-
- private[sql] val ordering = new Ordering[InternalType] {
- def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
- }
-
- /**
- * The default size of a value of the TimestampType is 12 bytes.
- */
- override def defaultSize: Int = 12
-
- private[spark] override def asNullable: TimestampType = this
-}
-
-case object TimestampType extends TimestampType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `java.sql.Date` values.
- * Please use the singleton [[DataTypes.DateType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class DateType private() extends AtomicType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "DateType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Int
-
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
-
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the DateType is 4 bytes.
- */
- override def defaultSize: Int = 4
-
- private[spark] override def asNullable: DateType = this
-}
-
-case object DateType extends DateType
-
-
-/**
- * :: DeveloperApi ::
- * Numeric data types.
- *
- * @group dataType
- */
-abstract class NumericType extends AtomicType {
- // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
- // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
- // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
- // desugared by the compiler into an argument to the objects constructor. This means there is no
- // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
- private[sql] val numeric: Numeric[InternalType]
-}
-
-
-protected[sql] object NumericType {
- def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
-}
-
-
-/** Matcher for any expressions that evaluate to [[IntegralType]]s */
-protected[sql] object IntegralType {
- def unapply(a: Expression): Boolean = a match {
- case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
- case _ => false
- }
-}
-
-
-protected[sql] sealed abstract class IntegralType extends NumericType {
- private[sql] val integral: Integral[InternalType]
-}
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Long` values. Please use the singleton [[DataTypes.LongType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class LongType private() extends IntegralType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "LongType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Long
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Long]]
- private[sql] val integral = implicitly[Integral[Long]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the LongType is 8 bytes.
- */
- override def defaultSize: Int = 8
-
- override def simpleString: String = "bigint"
-
- private[spark] override def asNullable: LongType = this
-}
-
-case object LongType extends LongType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Int` values. Please use the singleton [[DataTypes.IntegerType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class IntegerType private() extends IntegralType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "IntegerType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Int
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Int]]
- private[sql] val integral = implicitly[Integral[Int]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the IntegerType is 4 bytes.
- */
- override def defaultSize: Int = 4
-
- override def simpleString: String = "int"
-
- private[spark] override def asNullable: IntegerType = this
-}
-
-case object IntegerType extends IntegerType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Short` values. Please use the singleton [[DataTypes.ShortType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class ShortType private() extends IntegralType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "ShortType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Short
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Short]]
- private[sql] val integral = implicitly[Integral[Short]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the ShortType is 2 bytes.
- */
- override def defaultSize: Int = 2
-
- override def simpleString: String = "smallint"
-
- private[spark] override def asNullable: ShortType = this
-}
-
-case object ShortType extends ShortType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Byte` values. Please use the singleton [[DataTypes.ByteType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class ByteType private() extends IntegralType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "ByteType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Byte
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Byte]]
- private[sql] val integral = implicitly[Integral[Byte]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
-
- /**
- * The default size of a value of the ByteType is 1 byte.
- */
- override def defaultSize: Int = 1
-
- override def simpleString: String = "tinyint"
-
- private[spark] override def asNullable: ByteType = this
-}
-
-case object ByteType extends ByteType
-
-
-/** Matcher for any expressions that evaluate to [[FractionalType]]s */
-protected[sql] object FractionalType {
- def unapply(a: Expression): Boolean = a match {
- case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
- case _ => false
- }
-}
-
-
-protected[sql] sealed abstract class FractionalType extends NumericType {
- private[sql] val fractional: Fractional[InternalType]
- private[sql] val asIntegral: Integral[InternalType]
-}
-
-
-/** Precision parameters for a Decimal */
-case class PrecisionInfo(precision: Int, scale: Int)
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `java.math.BigDecimal` values.
- * A Decimal that might have fixed precision and scale, or unlimited values for these.
- *
- * Please use [[DataTypes.createDecimalType()]] to create a specific instance.
- *
- * @group dataType
- */
-@DeveloperApi
-case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
-
- /** No-arg constructor for kryo. */
- protected def this() = this(null)
-
- private[sql] type InternalType = Decimal
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = Decimal.DecimalIsFractional
- private[sql] val fractional = Decimal.DecimalIsFractional
- private[sql] val ordering = Decimal.DecimalIsFractional
- private[sql] val asIntegral = Decimal.DecimalAsIfIntegral
-
- def precision: Int = precisionInfo.map(_.precision).getOrElse(-1)
-
- def scale: Int = precisionInfo.map(_.scale).getOrElse(-1)
-
- override def typeName: String = precisionInfo match {
- case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
- case None => "decimal"
- }
-
- override def toString: String = precisionInfo match {
- case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
- case None => "DecimalType()"
- }
-
- /**
- * The default size of a value of the DecimalType is 4096 bytes.
- */
- override def defaultSize: Int = 4096
-
- override def simpleString: String = precisionInfo match {
- case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
- case None => "decimal(10,0)"
- }
-
- private[spark] override def asNullable: DecimalType = this
-}
-
-
-/** Extra factory methods and pattern matchers for Decimals */
-object DecimalType {
- val Unlimited: DecimalType = DecimalType(None)
-
- object Fixed {
- def unapply(t: DecimalType): Option[(Int, Int)] =
- t.precisionInfo.map(p => (p.precision, p.scale))
- }
-
- object Expression {
- def unapply(e: Expression): Option[(Int, Int)] = e.dataType match {
- case t: DecimalType => t.precisionInfo.map(p => (p.precision, p.scale))
- case _ => None
- }
- }
-
- def apply(): DecimalType = Unlimited
-
- def apply(precision: Int, scale: Int): DecimalType =
- DecimalType(Some(PrecisionInfo(precision, scale)))
-
- def unapply(t: DataType): Boolean = t.isInstanceOf[DecimalType]
-
- def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[DecimalType]
-
- def isFixed(dataType: DataType): Boolean = dataType match {
- case DecimalType.Fixed(_, _) => true
- case _ => false
- }
-}
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Double` values. Please use the singleton [[DataTypes.DoubleType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class DoubleType private() extends FractionalType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "DoubleType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Double
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Double]]
- private[sql] val fractional = implicitly[Fractional[Double]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
- private[sql] val asIntegral = DoubleAsIfIntegral
-
- /**
- * The default size of a value of the DoubleType is 8 bytes.
- */
- override def defaultSize: Int = 8
-
- private[spark] override def asNullable: DoubleType = this
-}
-
-case object DoubleType extends DoubleType
-
-
-/**
- * :: DeveloperApi ::
- * The data type representing `Float` values. Please use the singleton [[DataTypes.FloatType]].
- *
- * @group dataType
- */
-@DeveloperApi
-class FloatType private() extends FractionalType {
- // The companion object and this class is separated so the companion object also subclasses
- // this type. Otherwise, the companion object would be of type "FloatType$" in byte code.
- // Defined with a private constructor so the companion object is the only possible instantiation.
- private[sql] type InternalType = Float
- @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
- private[sql] val numeric = implicitly[Numeric[Float]]
- private[sql] val fractional = implicitly[Fractional[Float]]
- private[sql] val ordering = implicitly[Ordering[InternalType]]
- private[sql] val asIntegral = FloatAsIfIntegral
-
- /**
- * The default size of a value of the FloatType is 4 bytes.
- */
- override def defaultSize: Int = 4
-
- private[spark] override def asNullable: FloatType = this
-}
-
-case object FloatType extends FloatType
-
-
-object ArrayType {
- /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
- def apply(elementType: DataType): ArrayType = ArrayType(elementType, true)
-}
-
-
-/**
- * :: DeveloperApi ::
- * The data type for collections of multiple values.
- * Internally these are represented as columns that contain a ``scala.collection.Seq``.
- *
- * Please use [[DataTypes.createArrayType()]] to create a specific instance.
- *
- * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
- * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
- * array elements. The field of `containsNull` is used to specify if the array has `null` values.
- *
- * @param elementType The data type of values.
- * @param containsNull Indicates if values have `null` values
- *
- * @group dataType
- */
-@DeveloperApi
-case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
-
- /** No-arg constructor for kryo. */
- protected def this() = this(null, false)
-
- private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
- builder.append(
- s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
- DataType.buildFormattedString(elementType, s"$prefix |", builder)
- }
-
- override private[sql] def jsonValue =
- ("type" -> typeName) ~
- ("elementType" -> elementType.jsonValue) ~
- ("containsNull" -> containsNull)
-
- /**
- * The default size of a value of the ArrayType is 100 * the default size of the element type.
- * (We assume that there are 100 elements).
- */
- override def defaultSize: Int = 100 * elementType.defaultSize
-
- override def simpleString: String = s"array<${elementType.simpleString}>"
-
- private[spark] override def asNullable: ArrayType =
- ArrayType(elementType.asNullable, containsNull = true)
-}
-
-
-/**
- * A field inside a StructType.
- * @param name The name of this field.
- * @param dataType The data type of this field.
- * @param nullable Indicates if values of this field can be `null` values.
- * @param metadata The metadata of this field. The metadata should be preserved during
- * transformation if the content of the column is not modified, e.g, in selection.
- */
-case class StructField(
- name: String,
- dataType: DataType,
- nullable: Boolean = true,
- metadata: Metadata = Metadata.empty) {
-
- /** No-arg constructor for kryo. */
- protected def this() = this(null, null)
-
- private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
- builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
- DataType.buildFormattedString(dataType, s"$prefix |", builder)
- }
-
- // override the default toString to be compatible with legacy parquet files.
- override def toString: String = s"StructField($name,$dataType,$nullable)"
-
- private[sql] def jsonValue: JValue = {
- ("name" -> name) ~
- ("type" -> dataType.jsonValue) ~
- ("nullable" -> nullable) ~
- ("metadata" -> metadata.jsonValue)
- }
-}
-
-
-object StructType {
- protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
- StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
-
- def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
-
- def apply(fields: java.util.List[StructField]): StructType = {
- StructType(fields.toArray.asInstanceOf[Array[StructField]])
- }
-
- private[sql] def merge(left: DataType, right: DataType): DataType =
- (left, right) match {
- case (ArrayType(leftElementType, leftContainsNull),
- ArrayType(rightElementType, rightContainsNull)) =>
- ArrayType(
- merge(leftElementType, rightElementType),
- leftContainsNull || rightContainsNull)
-
- case (MapType(leftKeyType, leftValueType, leftContainsNull),
- MapType(rightKeyType, rightValueType, rightContainsNull)) =>
- MapType(
- merge(leftKeyType, rightKeyType),
- merge(leftValueType, rightValueType),
- leftContainsNull || rightContainsNull)
-
- case (StructType(leftFields), StructType(rightFields)) =>
- val newFields = ArrayBuffer.empty[StructField]
-
- leftFields.foreach {
- case leftField @ StructField(leftName, leftType, leftNullable, _) =>
- rightFields
- .find(_.name == leftName)
- .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
- leftField.copy(
- dataType = merge(leftType, rightType),
- nullable = leftNullable || rightNullable)
- }
- .orElse(Some(leftField))
- .foreach(newFields += _)
- }
-
- rightFields
- .filterNot(f => leftFields.map(_.name).contains(f.name))
- .foreach(newFields += _)
-
- StructType(newFields)
-
- case (DecimalType.Fixed(leftPrecision, leftScale),
- DecimalType.Fixed(rightPrecision, rightScale)) =>
- DecimalType(
- max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale),
- max(leftScale, rightScale))
-
- case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
- if leftUdt.userClass == rightUdt.userClass => leftUdt
-
- case (leftType, rightType) if leftType == rightType =>
- leftType
-
- case _ =>
- throw new SparkException(s"Failed to merge incompatible data types $left and $right")
- }
-}
-
-
-/**
- * :: DeveloperApi ::
- * A [[StructType]] object can be constructed by
- * {{{
- * StructType(fields: Seq[StructField])
- * }}}
- * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
- * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
- * If a provided name does not have a matching field, it will be ignored. For the case
- * of extracting a single StructField, a `null` will be returned.
- * Example:
- * {{{
- * import org.apache.spark.sql._
- *
- * val struct =
- * StructType(
- * StructField("a", IntegerType, true) ::
- * StructField("b", LongType, false) ::
- * StructField("c", BooleanType, false) :: Nil)
- *
- * // Extract a single StructField.
- * val singleField = struct("b")
- * // singleField: StructField = StructField(b,LongType,false)
- *
- * // This struct does not have a field called "d". null will be returned.
- * val nonExisting = struct("d")
- * // nonExisting: StructField = null
- *
- * // Extract multiple StructFields. Field names are provided in a set.
- * // A StructType object will be returned.
- * val twoFields = struct(Set("b", "c"))
- * // twoFields: StructType =
- * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
- *
- * // Any names without matching fields will be ignored.
- * // For the case shown below, "d" will be ignored and
- * // it is treated as struct(Set("b", "c")).
- * val ignoreNonExisting = struct(Set("b", "c", "d"))
- * // ignoreNonExisting: StructType =
- * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
- * }}}
- *
- * A [[org.apache.spark.sql.Row]] object is used as a value of the StructType.
- * Example:
- * {{{
- * import org.apache.spark.sql._
- *
- * val innerStruct =
- * StructType(
- * StructField("f1", IntegerType, true) ::
- * StructField("f2", LongType, false) ::
- * StructField("f3", BooleanType, false) :: Nil)
- *
- * val struct = StructType(
- * StructField("a", innerStruct, true) :: Nil)
- *
- * // Create a Row with the schema defined by struct
- * val row = Row(Row(1, 2, true))
- * // row: Row = [[1,2,true]]
- * }}}
- *
- * @group dataType
- */
-@DeveloperApi
-case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
-
- /** No-arg constructor for kryo. */
- protected def this() = this(null)
-
- /** Returns all field names in an array. */
- def fieldNames: Array[String] = fields.map(_.name)
-
- private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
- private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
- private lazy val nameToIndex: Map[String, Int] = fieldNames.zipWithIndex.toMap
-
- /**
- * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
- * have a name matching the given name, `null` will be returned.
- */
- def apply(name: String): StructField = {
- nameToField.getOrElse(name,
- throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
- }
-
- /**
- * Returns a [[StructType]] containing [[StructField]]s of the given names, preserving the
- * original order of fields. Those names which do not have matching fields will be ignored.
- */
- def apply(names: Set[String]): StructType = {
- val nonExistFields = names -- fieldNamesSet
- if (nonExistFields.nonEmpty) {
- throw new IllegalArgumentException(
- s"Field ${nonExistFields.mkString(",")} does not exist.")
- }
- // Preserve the original order of fields.
- StructType(fields.filter(f => names.contains(f.name)))
- }
-
- /**
- * Returns index of a given field
- */
- def fieldIndex(name: String): Int = {
- nameToIndex.getOrElse(name,
- throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
- }
-
- protected[sql] def toAttributes: Seq[AttributeReference] =
- map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
-
- def treeString: String = {
- val builder = new StringBuilder
- builder.append("root\n")
- val prefix = " |"
- fields.foreach(field => field.buildFormattedString(prefix, builder))
-
- builder.toString()
- }
-
- def printTreeString(): Unit = println(treeString)
-
- private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
- fields.foreach(field => field.buildFormattedString(prefix, builder))
- }
-
- override private[sql] def jsonValue =
- ("type" -> typeName) ~
- ("fields" -> map(_.jsonValue))
-
- override def apply(fieldIndex: Int): StructField = fields(fieldIndex)
-
- override def length: Int = fields.length
-
- override def iterator: Iterator[StructField] = fields.iterator
-
- /**
- * The default size of a value of the StructType is the total default sizes of all field types.
- */
- override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
-
- override def simpleString: String = {
- val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
- s"struct<${fieldTypes.mkString(",")}>"
- }
-
- /**
- * Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
- * B from `that`,
- *
- * 1. If A and B have the same name and data type, they are merged to a field C with the same name
- * and data type. C is nullable if and only if either A or B is nullable.
- * 2. If A doesn't exist in `that`, it's included in the result schema.
- * 3. If B doesn't exist in `this`, it's also included in the result schema.
- * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
- * thrown.
- */
- private[sql] def merge(that: StructType): StructType =
- StructType.merge(this, that).asInstanceOf[StructType]
-
- private[spark] override def asNullable: StructType = {
- val newFields = fields.map {
- case StructField(name, dataType, nullable, metadata) =>
- StructField(name, dataType.asNullable, nullable = true, metadata)
- }
-
- StructType(newFields)
- }
-}
-
-
-object MapType {
- /**
- * Construct a [[MapType]] object with the given key type and value type.
- * The `valueContainsNull` is true.
- */
- def apply(keyType: DataType, valueType: DataType): MapType =
- MapType(keyType: DataType, valueType: DataType, valueContainsNull = true)
-}
-
-
-/**
- * :: DeveloperApi ::
- * The data type for Maps. Keys in a map are not allowed to have `null` values.
- *
- * Please use [[DataTypes.createMapType()]] to create a specific instance.
- *
- * @param keyType The data type of map keys.
- * @param valueType The data type of map values.
- * @param valueContainsNull Indicates if map values have `null` values.
- *
- * @group dataType
- */
-case class MapType(
- keyType: DataType,
- valueType: DataType,
- valueContainsNull: Boolean) extends DataType {
-
- /** No-arg constructor for kryo. */
- def this() = this(null, null, false)
-
- private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
- builder.append(s"$prefix-- key: ${keyType.typeName}\n")
- builder.append(s"$prefix-- value: ${valueType.typeName} " +
- s"(valueContainsNull = $valueContainsNull)\n")
- DataType.buildFormattedString(keyType, s"$prefix |", builder)
- DataType.buildFormattedString(valueType, s"$prefix |", builder)
- }
-
- override private[sql] def jsonValue: JValue =
- ("type" -> typeName) ~
- ("keyType" -> keyType.jsonValue) ~
- ("valueType" -> valueType.jsonValue) ~
- ("valueContainsNull" -> valueContainsNull)
-
- /**
- * The default size of a value of the MapType is
- * 100 * (the default size of the key type + the default size of the value type).
- * (We assume that there are 100 elements).
- */
- override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
-
- override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
-
- private[spark] override def asNullable: MapType =
- MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
-}
-
-
-/**
- * ::DeveloperApi::
- * The data type for User Defined Types (UDTs).
- *
- * This interface allows a user to make their own classes more interoperable with SparkSQL;
- * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
- * a `DataFrame` which has class X in the schema.
- *
- * For SparkSQL to recognize UDTs, the UDT must be annotated with
- * [[SQLUserDefinedType]].
- *
- * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD.
- * The conversion via `deserialize` occurs when reading from a `DataFrame`.
- */
-@DeveloperApi
-abstract class UserDefinedType[UserType] extends DataType with Serializable {
-
- /** Underlying storage type for this UDT */
- def sqlType: DataType
-
- /** Paired Python UDT class, if exists. */
- def pyUDT: String = null
-
- /**
- * Convert the user type to a SQL datum
- *
- * TODO: Can we make this take obj: UserType? The issue is in
- * CatalystTypeConverters.convertToCatalyst, where we need to convert Any to UserType.
- */
- def serialize(obj: Any): Any
-
- /** Convert a SQL datum to the user type */
- def deserialize(datum: Any): UserType
-
- override private[sql] def jsonValue: JValue = {
- ("type" -> "udt") ~
- ("class" -> this.getClass.getName) ~
- ("pyClass" -> pyUDT) ~
- ("sqlType" -> sqlType.jsonValue)
- }
-
- /**
- * Class object for the UserType
- */
- def userClass: java.lang.Class[UserType]
-
- /**
- * The default size of a value of the UserDefinedType is 4096 bytes.
- */
- override def defaultSize: Int = 4096
-
- /**
- * For UDT, asNullable will not change the nullability of its internal sqlType and just returns
- * itself.
- */
- private[spark] override def asNullable: UserDefinedType[UserType] = this
-}