aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-18 18:34:36 -0800
committerReynold Xin <rxin@databricks.com>2015-11-18 18:34:36 -0800
commite61367b9f9bfc8e123369d55d7ca5925568b98a7 (patch)
treea10ef5ac1563e826f30e2fd3d63c4692cf66da53
parente99d3392068bc929c900a4cc7b50e9e2b437a23a (diff)
downloadspark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.tar.gz
spark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.tar.bz2
spark-e61367b9f9bfc8e123369d55d7ca5925568b98a7.zip
[SPARK-11833][SQL] Add Java tests for Kryo/Java Dataset encoders
Also added some nicer error messages for incompatible types (private types and primitive types) for Kryo/Java encoder. Author: Reynold Xin <rxin@databricks.com> Closes #9823 from rxin/SPARK-11833.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala69
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala22
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java75
4 files changed, 166 insertions, 40 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 1ed5111440..d54f2854fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import java.lang.reflect.Modifier
+
import scala.reflect.{ClassTag, classTag}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
@@ -43,30 +45,28 @@ trait Encoder[T] extends Serializable {
*/
object Encoders {
- /** A way to construct encoders using generic serializers. */
- private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
- ExpressionEncoder[T](
- schema = new StructType().add("value", BinaryType),
- flat = true,
- toRowExpressions = Seq(
- EncodeUsingSerializer(
- BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
- fromRowExpression =
- DecodeUsingSerializer[T](
- BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
- clsTag = classTag[T]
- )
- }
+ def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
+ def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
+ def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
+ def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
+ def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
+ def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
+ def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
+ def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
*/
def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
/**
* Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
+ *
+ * T must be publicly accessible.
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
@@ -75,6 +75,8 @@ object Encoders {
* serialization. This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
*/
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)
@@ -83,17 +85,40 @@ object Encoders {
* This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
+ *
+ * T must be publicly accessible.
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))
- def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
- def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
- def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
- def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
- def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
- def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
- def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
- def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
+ /** Throws an exception if T is not a public class. */
+ private def validatePublicClass[T: ClassTag](): Unit = {
+ if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
+ throw new UnsupportedOperationException(
+ s"${classTag[T].runtimeClass.getName} is not a public class. " +
+ "Only public classes are supported.")
+ }
+ }
+
+ /** A way to construct encoders using generic serializers. */
+ private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
+ if (classTag[T].runtimeClass.isPrimitive) {
+ throw new UnsupportedOperationException("Primitive types are not supported.")
+ }
+
+ validatePublicClass[T]()
+
+ ExpressionEncoder[T](
+ schema = new StructType().add("value", BinaryType),
+ flat = true,
+ toRowExpressions = Seq(
+ EncodeUsingSerializer(
+ BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
+ fromRowExpression =
+ DecodeUsingSerializer[T](
+ BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
+ clsTag = classTag[T]
+ )
+ }
def tuple[T1, T2](
e1: Encoder[T1],
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
new file mode 100644
index 0000000000..0b2a10bb04
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Encoders
+
+
+class EncoderErrorMessageSuite extends SparkFunSuite {
+
+ // Note: we also test error messages for encoders for private classes in JavaDatasetSuite.
+ // That is done in Java because Scala cannot create truly private classes.
+
+ test("primitive types in encoders using Kryo serialization") {
+ intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
+ intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
+ intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
+ }
+
+ test("primitive types in encoders using Java serialization") {
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] }
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] }
+ intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
index 6e0322fb6e..07523d49f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
@@ -74,24 +74,14 @@ class FlatEncoderSuite extends ExpressionEncoderSuite {
FlatEncoder[Map[Int, Map[String, Int]]], "map of map")
// Kryo encoders
- encodeDecodeTest(
- "hello",
- encoderFor(Encoders.kryo[String]),
- "kryo string")
- encodeDecodeTest(
- new KryoSerializable(15),
- encoderFor(Encoders.kryo[KryoSerializable]),
- "kryo object serialization")
+ encodeDecodeTest("hello", encoderFor(Encoders.kryo[String]), "kryo string")
+ encodeDecodeTest(new KryoSerializable(15),
+ encoderFor(Encoders.kryo[KryoSerializable]), "kryo object")
// Java encoders
- encodeDecodeTest(
- "hello",
- encoderFor(Encoders.javaSerialization[String]),
- "java string")
- encodeDecodeTest(
- new JavaSerializable(15),
- encoderFor(Encoders.javaSerialization[JavaSerializable]),
- "java object serialization")
+ encodeDecodeTest("hello", encoderFor(Encoders.javaSerialization[String]), "java string")
+ encodeDecodeTest(new JavaSerializable(15),
+ encoderFor(Encoders.javaSerialization[JavaSerializable]), "java object")
}
/** For testing Kryo serialization based encoder. */
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index d9b22506fb..ce40dd856f 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -24,6 +24,7 @@ import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
import scala.Tuple5;
+
import org.junit.*;
import org.apache.spark.Accumulator;
@@ -410,8 +411,8 @@ public class JavaDatasetSuite implements Serializable {
.as(Encoders.tuple(Encoders.STRING(), Encoders.INT(), Encoders.LONG(), Encoders.LONG()));
Assert.assertEquals(
Arrays.asList(
- new Tuple4<String, Integer, Long, Long>("a", 3, 3L, 2L),
- new Tuple4<String, Integer, Long, Long>("b", 3, 3L, 1L)),
+ new Tuple4<>("a", 3, 3L, 2L),
+ new Tuple4<>("b", 3, 3L, 1L)),
agged2.collectAsList());
}
@@ -437,4 +438,74 @@ public class JavaDatasetSuite implements Serializable {
return reduction;
}
}
+
+ public static class KryoSerializable {
+ String value;
+
+ KryoSerializable(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return this.value.equals(((KryoSerializable) other).value);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value.hashCode();
+ }
+ }
+
+ public static class JavaSerializable implements Serializable {
+ String value;
+
+ JavaSerializable(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return this.value.equals(((JavaSerializable) other).value);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value.hashCode();
+ }
+ }
+
+ @Test
+ public void testKryoEncoder() {
+ Encoder<KryoSerializable> encoder = Encoders.kryo(KryoSerializable.class);
+ List<KryoSerializable> data = Arrays.asList(
+ new KryoSerializable("hello"), new KryoSerializable("world"));
+ Dataset<KryoSerializable> ds = context.createDataset(data, encoder);
+ Assert.assertEquals(data, ds.collectAsList());
+ }
+
+ @Test
+ public void testJavaEncoder() {
+ Encoder<JavaSerializable> encoder = Encoders.javaSerialization(JavaSerializable.class);
+ List<JavaSerializable> data = Arrays.asList(
+ new JavaSerializable("hello"), new JavaSerializable("world"));
+ Dataset<JavaSerializable> ds = context.createDataset(data, encoder);
+ Assert.assertEquals(data, ds.collectAsList());
+ }
+
+ /**
+ * For testing error messages when creating an encoder on a private class. This is done
+ * here since we cannot create truly private classes in Scala.
+ */
+ private static class PrivateClassTest { }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testJavaEncoderErrorMessageForPrivateClass() {
+ Encoders.javaSerialization(PrivateClassTest.class);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testKryoEncoderErrorMessageForPrivateClass() {
+ Encoders.kryo(PrivateClassTest.class);
+ }
}