diff options
Diffstat (limited to 'sql')
3 files changed, 327 insertions, 236 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index e0bfe3c32f..ffa694fcdc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -17,22 +17,20 @@ package org.apache.spark.sql -import java.lang.reflect.Modifier - import scala.annotation.implicitNotFound -import scala.reflect.{classTag, ClassTag} +import scala.reflect.ClassTag import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} -import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer} import org.apache.spark.sql.types._ + /** * :: Experimental :: * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. * * == Scala == - * Encoders are generally created automatically through implicits from a `SQLContext`. + * Encoders are generally created automatically through implicits from a `SQLContext`, or can be + * explicitly created by calling static methods on [[Encoders]]. * * {{{ * import sqlContext.implicits._ @@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable { /** A ClassTag that can be used to construct and Array to contain a collection of `T`. */ def clsTag: ClassTag[T] } - -/** - * :: Experimental :: - * Methods for creating an [[Encoder]]. - * - * @since 1.6.0 - */ -@Experimental -object Encoders { - - /** - * An encoder for nullable boolean type. - * @since 1.6.0 - */ - def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder() - - /** - * An encoder for nullable byte type. - * @since 1.6.0 - */ - def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder() - - /** - * An encoder for nullable short type. - * @since 1.6.0 - */ - def SHORT: Encoder[java.lang.Short] = ExpressionEncoder() - - /** - * An encoder for nullable int type. - * @since 1.6.0 - */ - def INT: Encoder[java.lang.Integer] = ExpressionEncoder() - - /** - * An encoder for nullable long type. - * @since 1.6.0 - */ - def LONG: Encoder[java.lang.Long] = ExpressionEncoder() - - /** - * An encoder for nullable float type. - * @since 1.6.0 - */ - def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder() - - /** - * An encoder for nullable double type. - * @since 1.6.0 - */ - def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder() - - /** - * An encoder for nullable string type. - * @since 1.6.0 - */ - def STRING: Encoder[java.lang.String] = ExpressionEncoder() - - /** - * An encoder for nullable decimal type. - * @since 1.6.0 - */ - def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder() - - /** - * An encoder for nullable date type. - * @since 1.6.0 - */ - def DATE: Encoder[java.sql.Date] = ExpressionEncoder() - - /** - * An encoder for nullable timestamp type. - * @since 1.6.0 - */ - def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder() - - /** - * An encoder for arrays of bytes. - * @since 1.6.1 - */ - def BINARY: Encoder[Array[Byte]] = ExpressionEncoder() - - /** - * Creates an encoder for Java Bean of type T. - * - * T must be publicly accessible. - * - * supported types for java bean field: - * - primitive types: boolean, int, double, etc. - * - boxed types: Boolean, Integer, Double, etc. - * - String - * - java.math.BigDecimal - * - time related: java.sql.Date, java.sql.Timestamp - * - collection types: only array and java.util.List currently, map support is in progress - * - nested java bean. - * - * @since 1.6.0 - */ - def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass) - - /** - * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo. - * This encoder maps T into a single byte array (binary) field. - * - * T must be publicly accessible. - * - * @since 1.6.0 - */ - def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true) - - /** - * Creates an encoder that serializes objects of type T using Kryo. - * This encoder maps T into a single byte array (binary) field. - * - * T must be publicly accessible. - * - * @since 1.6.0 - */ - def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz)) - - /** - * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java - * serialization. This encoder maps T into a single byte array (binary) field. - * - * Note that this is extremely inefficient and should only be used as the last resort. - * - * T must be publicly accessible. - * - * @since 1.6.0 - */ - def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false) - - /** - * Creates an encoder that serializes objects of type T using generic Java serialization. - * This encoder maps T into a single byte array (binary) field. - * - * Note that this is extremely inefficient and should only be used as the last resort. - * - * T must be publicly accessible. - * - * @since 1.6.0 - */ - def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz)) - - /** Throws an exception if T is not a public class. */ - private def validatePublicClass[T: ClassTag](): Unit = { - if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) { - throw new UnsupportedOperationException( - s"${classTag[T].runtimeClass.getName} is not a public class. " + - "Only public classes are supported.") - } - } - - /** A way to construct encoders using generic serializers. */ - private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = { - if (classTag[T].runtimeClass.isPrimitive) { - throw new UnsupportedOperationException("Primitive types are not supported.") - } - - validatePublicClass[T]() - - ExpressionEncoder[T]( - schema = new StructType().add("value", BinaryType), - flat = true, - serializer = Seq( - EncodeUsingSerializer( - BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), - deserializer = - DecodeUsingSerializer[T]( - BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo), - clsTag = classTag[T] - ) - } - - /** - * An encoder for 2-ary tuples. - * @since 1.6.0 - */ - def tuple[T1, T2]( - e1: Encoder[T1], - e2: Encoder[T2]): Encoder[(T1, T2)] = { - ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) - } - - /** - * An encoder for 3-ary tuples. - * @since 1.6.0 - */ - def tuple[T1, T2, T3]( - e1: Encoder[T1], - e2: Encoder[T2], - e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { - ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) - } - - /** - * An encoder for 4-ary tuples. - * @since 1.6.0 - */ - def tuple[T1, T2, T3, T4]( - e1: Encoder[T1], - e2: Encoder[T2], - e3: Encoder[T3], - e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { - ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) - } - - /** - * An encoder for 5-ary tuples. - * @since 1.6.0 - */ - def tuple[T1, T2, T3, T4, T5]( - e1: Encoder[T1], - e2: Encoder[T2], - e3: Encoder[T3], - e4: Encoder[T4], - e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { - ExpressionEncoder.tuple( - encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5)) - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala new file mode 100644 index 0000000000..3f4df704db --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.lang.reflect.Modifier + +import scala.reflect.{classTag, ClassTag} +import scala.reflect.runtime.universe.TypeTag + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer} +import org.apache.spark.sql.types._ + +/** + * :: Experimental :: + * Methods for creating an [[Encoder]]. + * + * @since 1.6.0 + */ +@Experimental +object Encoders { + + /** + * An encoder for nullable boolean type. + * The Scala primitive encoder is available as [[scalaBoolean]]. + * @since 1.6.0 + */ + def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder() + + /** + * An encoder for nullable byte type. + * The Scala primitive encoder is available as [[scalaByte]]. + * @since 1.6.0 + */ + def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder() + + /** + * An encoder for nullable short type. + * The Scala primitive encoder is available as [[scalaShort]]. + * @since 1.6.0 + */ + def SHORT: Encoder[java.lang.Short] = ExpressionEncoder() + + /** + * An encoder for nullable int type. + * The Scala primitive encoder is available as [[scalaInt]]. + * @since 1.6.0 + */ + def INT: Encoder[java.lang.Integer] = ExpressionEncoder() + + /** + * An encoder for nullable long type. + * The Scala primitive encoder is available as [[scalaLong]]. + * @since 1.6.0 + */ + def LONG: Encoder[java.lang.Long] = ExpressionEncoder() + + /** + * An encoder for nullable float type. + * The Scala primitive encoder is available as [[scalaFloat]]. + * @since 1.6.0 + */ + def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder() + + /** + * An encoder for nullable double type. + * The Scala primitive encoder is available as [[scalaDouble]]. + * @since 1.6.0 + */ + def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder() + + /** + * An encoder for nullable string type. + * + * @since 1.6.0 + */ + def STRING: Encoder[java.lang.String] = ExpressionEncoder() + + /** + * An encoder for nullable decimal type. + * + * @since 1.6.0 + */ + def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder() + + /** + * An encoder for nullable date type. + * + * @since 1.6.0 + */ + def DATE: Encoder[java.sql.Date] = ExpressionEncoder() + + /** + * An encoder for nullable timestamp type. + * + * @since 1.6.0 + */ + def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder() + + /** + * An encoder for arrays of bytes. + * + * @since 1.6.1 + */ + def BINARY: Encoder[Array[Byte]] = ExpressionEncoder() + + /** + * Creates an encoder for Java Bean of type T. + * + * T must be publicly accessible. + * + * supported types for java bean field: + * - primitive types: boolean, int, double, etc. + * - boxed types: Boolean, Integer, Double, etc. + * - String + * - java.math.BigDecimal + * - time related: java.sql.Date, java.sql.Timestamp + * - collection types: only array and java.util.List currently, map support is in progress + * - nested java bean. + * + * @since 1.6.0 + */ + def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass) + + /** + * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo. + * This encoder maps T into a single byte array (binary) field. + * + * T must be publicly accessible. + * + * @since 1.6.0 + */ + def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true) + + /** + * Creates an encoder that serializes objects of type T using Kryo. + * This encoder maps T into a single byte array (binary) field. + * + * T must be publicly accessible. + * + * @since 1.6.0 + */ + def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz)) + + /** + * (Scala-specific) Creates an encoder that serializes objects of type T using generic Java + * serialization. This encoder maps T into a single byte array (binary) field. + * + * Note that this is extremely inefficient and should only be used as the last resort. + * + * T must be publicly accessible. + * + * @since 1.6.0 + */ + def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false) + + /** + * Creates an encoder that serializes objects of type T using generic Java serialization. + * This encoder maps T into a single byte array (binary) field. + * + * Note that this is extremely inefficient and should only be used as the last resort. + * + * T must be publicly accessible. + * + * @since 1.6.0 + */ + def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz)) + + /** Throws an exception if T is not a public class. */ + private def validatePublicClass[T: ClassTag](): Unit = { + if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) { + throw new UnsupportedOperationException( + s"${classTag[T].runtimeClass.getName} is not a public class. " + + "Only public classes are supported.") + } + } + + /** A way to construct encoders using generic serializers. */ + private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = { + if (classTag[T].runtimeClass.isPrimitive) { + throw new UnsupportedOperationException("Primitive types are not supported.") + } + + validatePublicClass[T]() + + ExpressionEncoder[T]( + schema = new StructType().add("value", BinaryType), + flat = true, + serializer = Seq( + EncodeUsingSerializer( + BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), + deserializer = + DecodeUsingSerializer[T]( + BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo), + clsTag = classTag[T] + ) + } + + /** + * An encoder for 2-ary tuples. + * + * @since 1.6.0 + */ + def tuple[T1, T2]( + e1: Encoder[T1], + e2: Encoder[T2]): Encoder[(T1, T2)] = { + ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) + } + + /** + * An encoder for 3-ary tuples. + * + * @since 1.6.0 + */ + def tuple[T1, T2, T3]( + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { + ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) + } + + /** + * An encoder for 4-ary tuples. + * + * @since 1.6.0 + */ + def tuple[T1, T2, T3, T4]( + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { + ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) + } + + /** + * An encoder for 5-ary tuples. + * + * @since 1.6.0 + */ + def tuple[T1, T2, T3, T4, T5]( + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4], + e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { + ExpressionEncoder.tuple( + encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5)) + } + + /** + * An encoder for Scala's product type (tuples, case classes, etc). + * @since 2.0.0 + */ + def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive int type. + * @since 2.0.0 + */ + def scalaInt: Encoder[Int] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive long type. + * @since 2.0.0 + */ + def scalaLong: Encoder[Long] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive double type. + * @since 2.0.0 + */ + def scalaDouble: Encoder[Double] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive float type. + * @since 2.0.0 + */ + def scalaFloat: Encoder[Float] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive byte type. + * @since 2.0.0 + */ + def scalaByte: Encoder[Byte] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive short type. + * @since 2.0.0 + */ + def scalaShort: Encoder[Short] = ExpressionEncoder() + + /** + * An encoder for Scala's primitive boolean type. + * @since 2.0.0 + */ + def scalaBoolean: Encoder[Boolean] = ExpressionEncoder() + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index c35a969bf0..ad69e23540 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -44,33 +44,33 @@ abstract class SQLImplicits { } /** @since 1.6.0 */ - implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T] // Primitives /** @since 1.6.0 */ - implicit def newIntEncoder: Encoder[Int] = ExpressionEncoder() + implicit def newIntEncoder: Encoder[Int] = Encoders.scalaInt /** @since 1.6.0 */ - implicit def newLongEncoder: Encoder[Long] = ExpressionEncoder() + implicit def newLongEncoder: Encoder[Long] = Encoders.scalaLong /** @since 1.6.0 */ - implicit def newDoubleEncoder: Encoder[Double] = ExpressionEncoder() + implicit def newDoubleEncoder: Encoder[Double] = Encoders.scalaDouble /** @since 1.6.0 */ - implicit def newFloatEncoder: Encoder[Float] = ExpressionEncoder() + implicit def newFloatEncoder: Encoder[Float] = Encoders.scalaFloat /** @since 1.6.0 */ - implicit def newByteEncoder: Encoder[Byte] = ExpressionEncoder() + implicit def newByteEncoder: Encoder[Byte] = Encoders.scalaByte /** @since 1.6.0 */ - implicit def newShortEncoder: Encoder[Short] = ExpressionEncoder() + implicit def newShortEncoder: Encoder[Short] = Encoders.scalaShort /** @since 1.6.0 */ - implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder() + implicit def newBooleanEncoder: Encoder[Boolean] = Encoders.scalaBoolean /** @since 1.6.0 */ - implicit def newStringEncoder: Encoder[String] = ExpressionEncoder() + implicit def newStringEncoder: Encoder[String] = Encoders.STRING // Seqs |