aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-07 00:46:57 -0700
committerReynold Xin <rxin@databricks.com>2016-04-07 00:46:57 -0700
commite11aa9ec5c3cdcd8ca08d2486a7208840ad77bf8 (patch)
tree9688d40b9a8b3ff2e0f8b9f0f4a3cec224106f10
parent21d5ca128bf3afd5c2d4c7fcc56240e28443474f (diff)
downloadspark-e11aa9ec5c3cdcd8ca08d2486a7208840ad77bf8.tar.gz
spark-e11aa9ec5c3cdcd8ca08d2486a7208840ad77bf8.tar.bz2
spark-e11aa9ec5c3cdcd8ca08d2486a7208840ad77bf8.zip
[SPARK-14452][SQL] Explicit APIs in Scala for specifying encoders
## What changes were proposed in this pull request? The Scala Dataset public API currently only allows users to specify encoders through SQLContext.implicits. This is OK but sometimes people want to explicitly get encoders without a SQLContext (e.g. Aggregator implementations). This patch adds public APIs to Encoders class for getting Scala encoders. ## How was this patch tested? None - I will update test cases once https://github.com/apache/spark/pull/12231 is merged. Author: Reynold Xin <rxin@databricks.com> Closes #12232 from rxin/SPARK-14452.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala231
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala314
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala18
3 files changed, 327 insertions, 236 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index e0bfe3c32f..ffa694fcdc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -17,22 +17,20 @@
package org.apache.spark.sql
-import java.lang.reflect.Modifier
-
import scala.annotation.implicitNotFound
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.types._
+
/**
* :: Experimental ::
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
*
* == Scala ==
- * Encoders are generally created automatically through implicits from a `SQLContext`.
+ * Encoders are generally created automatically through implicits from a `SQLContext`, or can be
+ * explicitly created by calling static methods on [[Encoders]].
*
* {{{
* import sqlContext.implicits._
@@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable {
/** A ClassTag that can be used to construct and Array to contain a collection of `T`. */
def clsTag: ClassTag[T]
}
-
-/**
- * :: Experimental ::
- * Methods for creating an [[Encoder]].
- *
- * @since 1.6.0
- */
-@Experimental
-object Encoders {
-
- /**
- * An encoder for nullable boolean type.
- * @since 1.6.0
- */
- def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
-
- /**
- * An encoder for nullable byte type.
- * @since 1.6.0
- */
- def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
-
- /**
- * An encoder for nullable short type.
- * @since 1.6.0
- */
- def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
-
- /**
- * An encoder for nullable int type.
- * @since 1.6.0
- */
- def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
-
- /**
- * An encoder for nullable long type.
- * @since 1.6.0
- */
- def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
-
- /**
- * An encoder for nullable float type.
- * @since 1.6.0
- */
- def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
-
- /**
- * An encoder for nullable double type.
- * @since 1.6.0
- */
- def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
-
- /**
- * An encoder for nullable string type.
- * @since 1.6.0
- */
- def STRING: Encoder[java.lang.String] = ExpressionEncoder()
-
- /**
- * An encoder for nullable decimal type.
- * @since 1.6.0
- */
- def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
-
- /**
- * An encoder for nullable date type.
- * @since 1.6.0
- */
- def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
-
- /**
- * An encoder for nullable timestamp type.
- * @since 1.6.0
- */
- def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
-
- /**
- * An encoder for arrays of bytes.
- * @since 1.6.1
- */
- def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
-
- /**
- * Creates an encoder for Java Bean of type T.
- *
- * T must be publicly accessible.
- *
- * supported types for java bean field:
- * - primitive types: boolean, int, double, etc.
- * - boxed types: Boolean, Integer, Double, etc.
- * - String
- * - java.math.BigDecimal
- * - time related: java.sql.Date, java.sql.Timestamp
- * - collection types: only array and java.util.List currently, map support is in progress
- * - nested java bean.
- *
- * @since 1.6.0
- */
- def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
-
- /**
- * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
- * This encoder maps T into a single byte array (binary) field.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
-
- /**
- * Creates an encoder that serializes objects of type T using Kryo.
- * This encoder maps T into a single byte array (binary) field.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
-
- /**
- * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
- * serialization. This encoder maps T into a single byte array (binary) field.
- *
- * Note that this is extremely inefficient and should only be used as the last resort.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
-
- /**
- * Creates an encoder that serializes objects of type T using generic Java serialization.
- * This encoder maps T into a single byte array (binary) field.
- *
- * Note that this is extremely inefficient and should only be used as the last resort.
- *
- * T must be publicly accessible.
- *
- * @since 1.6.0
- */
- def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
-
- /** Throws an exception if T is not a public class. */
- private def validatePublicClass[T: ClassTag](): Unit = {
- if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
- throw new UnsupportedOperationException(
- s"${classTag[T].runtimeClass.getName} is not a public class. " +
- "Only public classes are supported.")
- }
- }
-
- /** A way to construct encoders using generic serializers. */
- private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
- if (classTag[T].runtimeClass.isPrimitive) {
- throw new UnsupportedOperationException("Primitive types are not supported.")
- }
-
- validatePublicClass[T]()
-
- ExpressionEncoder[T](
- schema = new StructType().add("value", BinaryType),
- flat = true,
- serializer = Seq(
- EncodeUsingSerializer(
- BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
- deserializer =
- DecodeUsingSerializer[T](
- BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
- clsTag = classTag[T]
- )
- }
-
- /**
- * An encoder for 2-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2](
- e1: Encoder[T1],
- e2: Encoder[T2]): Encoder[(T1, T2)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
- }
-
- /**
- * An encoder for 3-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
- }
-
- /**
- * An encoder for 4-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3, T4](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3],
- e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
- ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
- }
-
- /**
- * An encoder for 5-ary tuples.
- * @since 1.6.0
- */
- def tuple[T1, T2, T3, T4, T5](
- e1: Encoder[T1],
- e2: Encoder[T2],
- e3: Encoder[T3],
- e4: Encoder[T4],
- e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
- ExpressionEncoder.tuple(
- encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
- }
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
new file mode 100644
index 0000000000..3f4df704db
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.lang.reflect.Modifier
+
+import scala.reflect.{classTag, ClassTag}
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
+import org.apache.spark.sql.types._
+
+/**
+ * :: Experimental ::
+ * Methods for creating an [[Encoder]].
+ *
+ * @since 1.6.0
+ */
+@Experimental
+object Encoders {
+
+ /**
+ * An encoder for nullable boolean type.
+ * The Scala primitive encoder is available as [[scalaBoolean]].
+ * @since 1.6.0
+ */
+ def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable byte type.
+ * The Scala primitive encoder is available as [[scalaByte]].
+ * @since 1.6.0
+ */
+ def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable short type.
+ * The Scala primitive encoder is available as [[scalaShort]].
+ * @since 1.6.0
+ */
+ def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable int type.
+ * The Scala primitive encoder is available as [[scalaInt]].
+ * @since 1.6.0
+ */
+ def INT: Encoder[java.lang.Integer] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable long type.
+ * The Scala primitive encoder is available as [[scalaLong]].
+ * @since 1.6.0
+ */
+ def LONG: Encoder[java.lang.Long] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable float type.
+ * The Scala primitive encoder is available as [[scalaFloat]].
+ * @since 1.6.0
+ */
+ def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable double type.
+ * The Scala primitive encoder is available as [[scalaDouble]].
+ * @since 1.6.0
+ */
+ def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable string type.
+ *
+ * @since 1.6.0
+ */
+ def STRING: Encoder[java.lang.String] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable decimal type.
+ *
+ * @since 1.6.0
+ */
+ def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable date type.
+ *
+ * @since 1.6.0
+ */
+ def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
+
+ /**
+ * An encoder for nullable timestamp type.
+ *
+ * @since 1.6.0
+ */
+ def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
+
+ /**
+ * An encoder for arrays of bytes.
+ *
+ * @since 1.6.1
+ */
+ def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
+
+ /**
+ * Creates an encoder for Java Bean of type T.
+ *
+ * T must be publicly accessible.
+ *
+ * supported types for java bean field:
+ * - primitive types: boolean, int, double, etc.
+ * - boxed types: Boolean, Integer, Double, etc.
+ * - String
+ * - java.math.BigDecimal
+ * - time related: java.sql.Date, java.sql.Timestamp
+ * - collection types: only array and java.util.List currently, map support is in progress
+ * - nested java bean.
+ *
+ * @since 1.6.0
+ */
+ def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)
+
+ /**
+ * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
+
+ /**
+ * Creates an encoder that serializes objects of type T using Kryo.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
+
+ /**
+ * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
+ * serialization. This encoder maps T into a single byte array (binary) field.
+ *
+ * Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
+
+ /**
+ * Creates an encoder that serializes objects of type T using generic Java serialization.
+ * This encoder maps T into a single byte array (binary) field.
+ *
+ * Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
+ *
+ * @since 1.6.0
+ */
+ def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
+
+ /** Throws an exception if T is not a public class. */
+ private def validatePublicClass[T: ClassTag](): Unit = {
+ if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
+ throw new UnsupportedOperationException(
+ s"${classTag[T].runtimeClass.getName} is not a public class. " +
+ "Only public classes are supported.")
+ }
+ }
+
+ /** A way to construct encoders using generic serializers. */
+ private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
+ if (classTag[T].runtimeClass.isPrimitive) {
+ throw new UnsupportedOperationException("Primitive types are not supported.")
+ }
+
+ validatePublicClass[T]()
+
+ ExpressionEncoder[T](
+ schema = new StructType().add("value", BinaryType),
+ flat = true,
+ serializer = Seq(
+ EncodeUsingSerializer(
+ BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
+ deserializer =
+ DecodeUsingSerializer[T](
+ BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
+ clsTag = classTag[T]
+ )
+ }
+
+ /**
+ * An encoder for 2-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2](
+ e1: Encoder[T1],
+ e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
+ }
+
+ /**
+ * An encoder for 3-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
+ }
+
+ /**
+ * An encoder for 4-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3, T4](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3],
+ e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
+ }
+
+ /**
+ * An encoder for 5-ary tuples.
+ *
+ * @since 1.6.0
+ */
+ def tuple[T1, T2, T3, T4, T5](
+ e1: Encoder[T1],
+ e2: Encoder[T2],
+ e3: Encoder[T3],
+ e4: Encoder[T4],
+ e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
+ ExpressionEncoder.tuple(
+ encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
+ }
+
+ /**
+ * An encoder for Scala's product type (tuples, case classes, etc).
+ * @since 2.0.0
+ */
+ def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive int type.
+ * @since 2.0.0
+ */
+ def scalaInt: Encoder[Int] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive long type.
+ * @since 2.0.0
+ */
+ def scalaLong: Encoder[Long] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive double type.
+ * @since 2.0.0
+ */
+ def scalaDouble: Encoder[Double] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive float type.
+ * @since 2.0.0
+ */
+ def scalaFloat: Encoder[Float] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive byte type.
+ * @since 2.0.0
+ */
+ def scalaByte: Encoder[Byte] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive short type.
+ * @since 2.0.0
+ */
+ def scalaShort: Encoder[Short] = ExpressionEncoder()
+
+ /**
+ * An encoder for Scala's primitive boolean type.
+ * @since 2.0.0
+ */
+ def scalaBoolean: Encoder[Boolean] = ExpressionEncoder()
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index c35a969bf0..ad69e23540 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -44,33 +44,33 @@ abstract class SQLImplicits {
}
/** @since 1.6.0 */
- implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
+ implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
// Primitives
/** @since 1.6.0 */
- implicit def newIntEncoder: Encoder[Int] = ExpressionEncoder()
+ implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt
/** @since 1.6.0 */
- implicit def newLongEncoder: Encoder[Long] = ExpressionEncoder()
+ implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong
/** @since 1.6.0 */
- implicit def newDoubleEncoder: Encoder[Double] = ExpressionEncoder()
+ implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble
/** @since 1.6.0 */
- implicit def newFloatEncoder: Encoder[Float] = ExpressionEncoder()
+ implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat
/** @since 1.6.0 */
- implicit def newByteEncoder: Encoder[Byte] = ExpressionEncoder()
+ implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte
/** @since 1.6.0 */
- implicit def newShortEncoder: Encoder[Short] = ExpressionEncoder()
+ implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort
/** @since 1.6.0 */
- implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder()
+ implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean
/** @since 1.6.0 */
- implicit def newStringEncoder: Encoder[String] = ExpressionEncoder()
+ implicit def newStringEncoder: Encoder[String] = Encoders.STRING
// Seqs