aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala119
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala5
5 files changed, 166 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index b7bc087855..f9f78852f0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf)
override def newInstance(): SerializerInstance = {
new KryoSerializerInstance(this)
}
+
+ private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
+ // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
+ // in the stream rather than writing those objects' serialized bytes, breaking relocation. See
+ // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
+ newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
+ }
}
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index c381672a4f..6078c9d433 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{DeveloperApi, Private}
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
/**
@@ -63,6 +63,39 @@ abstract class Serializer {
/** Creates a new [[SerializerInstance]]. */
def newInstance(): SerializerInstance
+
+ /**
+ * :: Private ::
+ * Returns true if this serializer supports relocation of its serialized objects and false
+ * otherwise. This should return true if and only if reordering the bytes of serialized objects
+ * in serialization stream output is equivalent to having re-ordered those elements prior to
+ * serializing them. More specifically, the following should hold if a serializer supports
+ * relocation:
+ *
+ * {{{
+ * serOut.open()
+ * position = 0
+ * serOut.write(obj1)
+ * serOut.flush()
+ * position = # of bytes writen to stream so far
+ * obj1Bytes = output[0:position-1]
+ * serOut.write(obj2)
+ * serOut.flush()
+ * position2 = # of bytes written to stream so far
+ * obj2Bytes = output[position:position2-1]
+ * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
+ * }}}
+ *
+ * In general, this property should hold for serializers that are stateless and that do not
+ * write special metadata at the beginning or end of the serialization stream.
+ *
+ * This API is private to Spark; this method should not be overridden in third-party subclasses
+ * or called in user code and is subject to removal in future Spark releases.
+ *
+ * See SPARK-7311 for more details.
+ */
+ @Private
+ private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b7306cd551..7d5cf7b61e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
private val useSerializedPairBuffer =
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
- ser.isInstanceOf[KryoSerializer] &&
- serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
+ ser.supportsRelocationOfSerializedObjects
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
new file mode 100644
index 0000000000..bb34033fe9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.util.Random
+
+import org.scalatest.{Assertions, FunSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
+
+/**
+ * Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
+ * describe properties of the serialized stream, such as
+ * [[Serializer.supportsRelocationOfSerializedObjects]].
+ */
+class SerializerPropertiesSuite extends FunSuite {
+
+ import SerializerPropertiesSuite._
+
+ test("JavaSerializer does not support relocation") {
+ // Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
+ // full class name the first time an object is written to an output stream, but subsequent
+ // references to the class write a more compact identifier; this prevents relocation.
+ val ser = new JavaSerializer(new SparkConf())
+ testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+ }
+
+ test("KryoSerializer supports relocation when auto-reset is enabled") {
+ val ser = new KryoSerializer(new SparkConf)
+ assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
+ testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+ }
+
+ test("KryoSerializer does not support relocation when auto-reset is disabled") {
+ val conf = new SparkConf().set("spark.kryo.registrator",
+ classOf[RegistratorWithoutAutoReset].getName)
+ val ser = new KryoSerializer(conf)
+ assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
+ testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+ }
+
+}
+
+object SerializerPropertiesSuite extends Assertions {
+
+ def generateRandomItem(rand: Random): Any = {
+ val randomFunctions: Seq[() => Any] = Seq(
+ () => rand.nextInt(),
+ () => rand.nextString(rand.nextInt(10)),
+ () => rand.nextDouble(),
+ () => rand.nextBoolean(),
+ () => (rand.nextInt(), rand.nextString(rand.nextInt(10))),
+ () => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))),
+ () => {
+ val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10)))
+ (x, x)
+ }
+ )
+ randomFunctions(rand.nextInt(randomFunctions.size)).apply()
+ }
+
+ def testSupportsRelocationOfSerializedObjects(
+ serializer: Serializer,
+ generateRandomItem: Random => Any): Unit = {
+ if (!serializer.supportsRelocationOfSerializedObjects) {
+ return
+ }
+ val NUM_TRIALS = 5
+ val rand = new Random(42)
+ for (_ <- 1 to NUM_TRIALS) {
+ val items = {
+ // Make sure that we have duplicate occurrences of the same object in the stream:
+ val randomItems = Seq.fill(10)(generateRandomItem(rand))
+ randomItems ++ randomItems.take(5)
+ }
+ val baos = new ByteArrayOutputStream()
+ val serStream = serializer.newInstance().serializeStream(baos)
+ def serializeItem(item: Any): Array[Byte] = {
+ val itemStartOffset = baos.toByteArray.length
+ serStream.writeObject(item)
+ serStream.flush()
+ val itemEndOffset = baos.toByteArray.length
+ baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone()
+ }
+ val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = {
+ val serItems = items.map {
+ item => (item, serializeItem(item))
+ }
+ serStream.close()
+ rand.shuffle(serItems)
+ }
+ val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray
+ val deserializedItemsStream = serializer.newInstance().deserializeStream(
+ new ByteArrayInputStream(reorderedSerializedData))
+ assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1))
+ deserializedItemsStream.close()
+ }
+ }
+}
+
+private case class MyCaseClass(foo: Int, bar: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 9552f41115..35ad987eb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -154,6 +154,11 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema:
with Serializable{
def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema)
+
+ override def supportsRelocationOfSerializedObjects: Boolean = {
+ // SparkSqlSerializer2 is stateless and writes no stream headers
+ true
+ }
}
private[sql] object SparkSqlSerializer2 {