aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-05-22 13:28:14 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-22 13:28:14 -0700
commiteac00691da93a94e6cff5ae0f8952e5724e78094 (patch)
tree723a723b34c503e754c38b8f704b39de54071d18 /core/src/main
parent509d55ab416359fab0525189458e2ea96379cf14 (diff)
downloadspark-eac00691da93a94e6cff5ae0f8952e5724e78094.tar.gz
spark-eac00691da93a94e6cff5ae0f8952e5724e78094.tar.bz2
spark-eac00691da93a94e6cff5ae0f8952e5724e78094.zip
[SPARK-7766] KryoSerializerInstance reuse is unsafe when auto-reset is disabled
SPARK-3386 / #5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization. This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled. Author: Josh Rosen <joshrosen@databricks.com> Closes #6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits: e19726d [Josh Rosen] Add fix for SPARK-7766. 71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 64ba27f34d..2179579634 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
+ kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
try {
kryo.writeClassAndObject(output, t)
} catch {
@@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
}
override def serializeStream(s: OutputStream): SerializationStream = {
+ kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
new KryoSerializationStream(kryo, s)
}