aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-18 18:34:36 -0800
committerReynold Xin <rxin@databricks.com>2015-11-18 18:34:36 -0800
commite61367b9f9bfc8e123369d55d7ca5925568b98a7 (patch)
treea10ef5ac1563e826f30e2fd3d63c4692cf66da53 /sql/catalyst
parente99d3392068bc929c900a4cc7b50e9e2b437a23a (diff)
downloadspark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.tar.gz
spark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.tar.bz2
spark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.zip
[SPARK-11833][SQL] Add Java tests for Kryo/Java Dataset encoders
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder. Author: Reynold Xin <rxin@databricks.com> Closes #9823 from rxin/SPARK-11833.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala69
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala22
3 files changed, 93 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 1ed5111440..d54f2854fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import java.lang.reflect.Modifier
+
import scala.reflect.{ClassTag, classTag}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
@@ -43,30 +45,28 @@ trait Encoder[T] extends Serializable {
*/
object Encoders {
- /** A way to construct encoders using generic serializers. */
- private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
- ExpressionEncoder[T](
- schema = new StructType().add("value", BinaryType),
- flat = true,
- toRowExpressions = Seq(
- EncodeUsingSerializer(
- BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
- fromRowExpression =
- DecodeUsingSerializer[T](
- BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
- clsTag = classTag[T]
- )
- }
+ def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
+ def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
+ def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
+ def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
+ def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
+ def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
+ def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
+ def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
*/
def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
/**
* Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
@@ -75,6 +75,8 @@ object Encoders {
* serialization. This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
*/
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
@@ -83,17 +85,40 @@ object Encoders {
* This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
- def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
- def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
- def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
- def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
- def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
- def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
- def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
- def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
+ /** Throws an exception if T is not a public class. */
+ private def validatePublicClass[T: ClassTag](): Unit = {
+ if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
+ throw new UnsupportedOperationException(
+ s"${classTag[T].runtimeClass.getName} is not a public class. " +
+ "Only public classes are supported.")
+ }
+ }
+
+ /** A way to construct encoders using generic serializers. */
+ private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
+ if (classTag[T].runtimeClass.isPrimitive) {
+ throw new UnsupportedOperationException("Primitive types are not supported.")
+ }
+
+ validatePublicClass[T]()
+
+ ExpressionEncoder[T](
+ schema = new StructType().add("value", BinaryType),
+ flat = true,
+ toRowExpressions = Seq(
+ EncodeUsingSerializer(
+ BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
+ fromRowExpression =
+ DecodeUsingSerializer[T](
+ BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
+ clsTag = classTag[T]
+ )
+ }
def tuple[T1, T2](
e1: Encoder[T1],
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
new file mode 100644
index 0000000000..0b2a10bb04
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Encoders
+
+
+class EncoderErrorMessageSuite extends SparkFunSuite {
+
+ // Note: we also test error messages for encoders for private classes in JavaDatasetSuite.
+ // That is done in Java because Scala cannot create truly private classes.
+
+ test("primitive types in encoders using Kryo serialization") {
+ intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
+ intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
+ intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
+ }
+
+ test("primitive types in encoders using Java serialization") {
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] }
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] }
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
index 6e0322fb6e..07523d49f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
@@ -74,24 +74,14 @@ class FlatEncoderSuite extends ExpressionEncoderSuite {
FlatEncoder[Map[Int, Map[String, Int]]], "map of map")
// Kryo encoders
- encodeDecodeTest(
- "hello",
- encoderFor(Encoders.kryo[String]),
- "kryo string")
- encodeDecodeTest(
- new KryoSerializable(15),
- encoderFor(Encoders.kryo[KryoSerializable]),
- "kryo object serialization")
+ encodeDecodeTest("hello", encoderFor(Encoders.kryo[String]), "kryo string")
+ encodeDecodeTest(new KryoSerializable(15),
+ encoderFor(Encoders.kryo[KryoSerializable]), "kryo object")
// Java encoders
- encodeDecodeTest(
- "hello",
- encoderFor(Encoders.javaSerialization[String]),
- "java string")
- encodeDecodeTest(
- new JavaSerializable(15),
- encoderFor(Encoders.javaSerialization[JavaSerializable]),
- "java object serialization")
+ encodeDecodeTest("hello", encoderFor(Encoders.javaSerialization[String]), "java string")
+ encodeDecodeTest(new JavaSerializable(15),
+ encoderFor(Encoders.javaSerialization[JavaSerializable]), "java object")
}
/** For testing Kryo serialization based encoder. */