aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala231
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala314
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala18
3 files changed, 327 insertions, 236 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index e0bfe3c32f..ffa694fcdc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -17,22 +17,20 @@
package org.apache.spark.sql
-import java.lang.reflect.Modifier
-
import scala.annotation.implicitNotFound
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.types._
+
/**
* :: Experimental ::
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
*
* == Scala ==
- * Encoders are generally created automatically through implicits from a `SQLContext`.
+ * Encoders are generally created automatically through implicits from a `SQLContext`, or can be
+ * explicitly created by calling static methods on [[Encoders]].
*
* {{{
* import sqlContext.implicits._
@@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable {
/** A ClassTag that can be used to construct and Array to contain a collection of `T`. */
def clsTag: ClassTag[T]
}
-
-/**
- * :: Experimental ::
- * Methods for creating an [[Encoder]].
- *
- * @since 1.6.0
- */
-@Experimental
-object Encoders {
-
- /**
- * An encoder for nullable boolean type.
- * @since 1.6.0
- */
- def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
-
- /**
- * An encoder for nullable byte type.
- * @since 1.6.0
- */
- def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
-
- /**
- * An encoder for nullable short type.
- * @since 1.6.0
- */
- def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
-
- /**
- * An encoder for nullable int type.
- * @since 1.6.0
- */
- def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
-
- /**
- * An encoder for nullable long type.
- * @since 1.6.0
- */
- def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
-
- /**
- * An encoder for nullable float type.
- * @since 1.6.0
- */
- def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
-
- /**
- * An encoder for nullable double type.
- * @since 1.6.0
- */
- def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
-
- /**
- * An encoder for nullable string type.
- * @since 1.6.0
- */
- def STRING: Encoder[java.lang.String] = ExpressionEncoder()
-
- /**
- * An encoder for nullable decimal type.
- * @since 1.6.0
- */
- def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
-
- /**
- * An encoder for nullable date type.
- * @since 1.6.0
- */
- def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
-
- /**
- * An encoder for nullable timestamp type.
- * @since 1.6.0
- */
- def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
-
- /**
- * An encoder for arrays of bytes.
- * @since 1.6.1
- */
- def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
-
- /**
- * Creates an encoder for Java Bean of type T.
- *
- * T must be publicly accessible.
- *
- * supported types for java bean field:
- * - primitive types: boolean, int, double, etc.
- * - boxed types: Boolean, Integer, Double, etc.
- * - String
- * - java.math.BigDecimal
- * - time related: java.sql.Date, java.sql.Timestamp
- * - collection types: only array and java.util.List currently, map support is in progress
- * - nested java bean.
- *
- * @since 1.6.0
- */
- def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
-
- /**
- * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
- * This encoder maps T into a single byte array (binary) field.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
-
- /**
- * Creates an encoder that serializes objects of type T using Kryo.
- * This encoder maps T into a single byte array (binary) field.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
-
- /**
- * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
- * serialization. This encoder maps T into a single byte array (binary) field.
- *
- * Note that this is extremely inefficient and should only be used as the last resort.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
-
- /**
- * Creates an encoder that serializes objects of type T using generic Java serialization.
- * This encoder maps T into a single byte array (binary) field.
- *
- * Note that this is extremely inefficient and should only be used as the last resort.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
-
- /** Throws an exception if T is not a public class. */
- private def validatePublicClass[T: ClassTag](): Unit = {
- if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
- throw new UnsupportedOperationException(
- s"${classTag[T].runtimeClass.getName} is not a public class. " +
- "Only public classes are supported.")
- }
- }
-
- /** A way to construct encoders using generic serializers. */
- private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
- if (classTag[T].runtimeClass.isPrimitive) {
- throw new UnsupportedOperationException("Primitive types are not supported.")
- }
-
- validatePublicClass[T]()
-
- ExpressionEncoder[T](
- schema = new StructType().add("value", BinaryType),
- flat = true,
- serializer = Seq(
- EncodeUsingSerializer(
- BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
- deserializer =
- DecodeUsingSerializer[T](
- BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
- clsTag = classTag[T]
- )
- }
-
- /**
- * An encoder for 2-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2](
- e1: Encoder[T1],
- e2: Encoder[T2]): Encoder[(T1, T2)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
- }
-
- /**
- * An encoder for 3-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
- }
-
- /**
- * An encoder for 4-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3, T4](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3],
- e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
- }
-
- /**
- * An encoder for 5-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3, T4, T5](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3],
- e4: Encoder[T4],
- e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
- ExpressionEncoder.tuple(
- encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
- }
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
new file mode 100644
index 0000000000..3f4df704db
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.lang.reflect.Modifier
+
+import scala.reflect.{classTag, ClassTag}
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
+import org.apache.spark.sql.types._
+
+/**
+ * :: Experimental ::
+ * Methods for creating an [[Encoder]].
+ *
+ * @since 1.6.0
+ */
+@Experimental
+object Encoders {
+
+ /**
+ * An encoder for nullable boolean type.
+ * The Scala primitive encoder is available as [[scalaBoolean]].
+ * @since 1.6.0
+ */
+ def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable byte type.
+ * The Scala primitive encoder is available as [[scalaByte]].
+ * @since 1.6.0
+ */
+ def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable short type.
+ * The Scala primitive encoder is available as [[scalaShort]].
+ * @since 1.6.0
+ */
+ def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable int type.
+ * The Scala primitive encoder is available as [[scalaInt]].
+ * @since 1.6.0
+ */
+ def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable long type.
+ * The Scala primitive encoder is available as [[scalaLong]].
+ * @since 1.6.0
+ */
+ def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable float type.
+ * The Scala primitive encoder is available as [[scalaFloat]].
+ * @since 1.6.0
+ */
+ def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable double type.
+ * The Scala primitive encoder is available as [[scalaDouble]].
+ * @since 1.6.0
+ */
+ def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable string type.
+ *
+ * @since 1.6.0
+ */
+ def STRING: Encoder[java.lang.String] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable decimal type.
+ *
+ * @since 1.6.0
+ */
+ def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable date type.
+ *
+ * @since 1.6.0
+ */
+ def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable timestamp type.
+ *
+ * @since 1.6.0
+ */
+ def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
+
+ /**
+ * An encoder for arrays of bytes.
+ *
+ * @since 1.6.1
+ */
+ def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
+
+ /**
+ * Creates an encoder for Java Bean of type T.
+ *
+ * T must be publicly accessible.
+ *
+ * supported types for java bean field:
+ * - primitive types: boolean, int, double, etc.
+ * - boxed types: Boolean, Integer, Double, etc.
+ * - String
+ * - java.math.BigDecimal
+ * - time related: java.sql.Date, java.sql.Timestamp
+ * - collection types: only array and java.util.List currently, map support is in progress
+ * - nested java bean.
+ *
+ * @since 1.6.0
+ */
+ def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
+
+ /**
+ * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
+
+ /**
+ * Creates an encoder that serializes objects of type T using Kryo.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
+
+ /**
+ * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
+ * serialization. This encoder maps T into a single byte array (binary) field.
+ *
+ * Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
+
+ /**
+ * Creates an encoder that serializes objects of type T using generic Java serialization.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
+
+ /** Throws an exception if T is not a public class. */
+ private def validatePublicClass[T: ClassTag](): Unit = {
+ if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
+ throw new UnsupportedOperationException(
+ s"${classTag[T].runtimeClass.getName} is not a public class. " +
+ "Only public classes are supported.")
+ }
+ }
+
+ /** A way to construct encoders using generic serializers. */
+ private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
+ if (classTag[T].runtimeClass.isPrimitive) {
+ throw new UnsupportedOperationException("Primitive types are not supported.")
+ }
+
+ validatePublicClass[T]()
+
+ ExpressionEncoder[T](
+ schema = new StructType().add("value", BinaryType),
+ flat = true,
+ serializer = Seq(
+ EncodeUsingSerializer(
+ BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
+ deserializer =
+ DecodeUsingSerializer[T](
+ BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
+ clsTag = classTag[T]
+ )
+ }
+
+ /**
+ * An encoder for 2-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2](
+ e1: Encoder[T1],
+ e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
+ }
+
+ /**
+ * An encoder for 3-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
+ }
+
+ /**
+ * An encoder for 4-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3, T4](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3],
+ e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
+ }
+
+ /**
+ * An encoder for 5-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3, T4, T5](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3],
+ e4: Encoder[T4],
+ e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
+ ExpressionEncoder.tuple(
+ encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
+ }
+
+ /**
+ * An encoder for Scala's product type (tuples, case classes, etc).
+ * @since 2.0.0
+ */
+ def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive int type.
+ * @since 2.0.0
+ */
+ def scalaInt: Encoder[Int] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive long type.
+ * @since 2.0.0
+ */
+ def scalaLong: Encoder[Long] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive double type.
+ * @since 2.0.0
+ */
+ def scalaDouble: Encoder[Double] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive float type.
+ * @since 2.0.0
+ */
+ def scalaFloat: Encoder[Float] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive byte type.
+ * @since 2.0.0
+ */
+ def scalaByte: Encoder[Byte] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive short type.
+ * @since 2.0.0
+ */
+ def scalaShort: Encoder[Short] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive boolean type.
+ * @since 2.0.0
+ */
+ def scalaBoolean: Encoder[Boolean] = ExpressionEncoder()
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index c35a969bf0..ad69e23540 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -44,33 +44,33 @@ abstract class SQLImplicits {
}
/** @since 1.6.0 */
- implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
+ implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
// Primitives
/** @since 1.6.0 */
- implicit def newIntEncoder: Encoder[Int] = ExpressionEncoder()
+ implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt
/** @since 1.6.0 */
- implicit def newLongEncoder: Encoder[Long] = ExpressionEncoder()
+ implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong
/** @since 1.6.0 */
- implicit def newDoubleEncoder: Encoder[Double] = ExpressionEncoder()
+ implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble
/** @since 1.6.0 */
- implicit def newFloatEncoder: Encoder[Float] = ExpressionEncoder()
+ implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat
/** @since 1.6.0 */
- implicit def newByteEncoder: Encoder[Byte] = ExpressionEncoder()
+ implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte
/** @since 1.6.0 */
- implicit def newShortEncoder: Encoder[Short] = ExpressionEncoder()
+ implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort
/** @since 1.6.0 */
- implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder()
+ implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean
/** @since 1.6.0 */
- implicit def newStringEncoder: Encoder[String] = ExpressionEncoder()
+ implicit def newStringEncoder: Encoder[String] = Encoders.STRING
// Seqs