diff options
author | Sandeep <sandeep@techaddict.me> | 2014-05-08 22:30:17 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-08 22:30:17 -0700 |
commit | 7db47c463fefc244e9c100d4aab90451c3828261 (patch) | |
tree | e4e72eab8d0d8466274d318a8052d0b1250a6cab | |
parent | 06b15baab25951d124bbe6b64906f4139e037deb (diff) | |
download | spark-7db47c463fefc244e9c100d4aab90451c3828261.tar.gz spark-7db47c463fefc244e9c100d4aab90451c3828261.tar.bz2 spark-7db47c463fefc244e9c100d4aab90451c3828261.zip |
SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo
This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables.
Author: Sandeep <sandeep@techaddict.me>
Closes #707 from techaddict/SPARK-1775 and squashes the following commits:
18d8ebf [Sandeep] SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4b0324f2b5..9ba586f758 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -57,15 +57,13 @@ private[spark] object ShuffleMapTask { } def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = { - synchronized { - val loader = Thread.currentThread.getContextClassLoader - val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val ser = SparkEnv.get.closureSerializer.newInstance() - val objIn = ser.deserializeStream(in) - val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] - (rdd, dep) - } + val loader = Thread.currentThread.getContextClassLoader + val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) + val ser = SparkEnv.get.closureSerializer.newInstance() + val objIn = ser.deserializeStream(in) + val rdd = objIn.readObject().asInstanceOf[RDD[_]] + val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] + (rdd, dep) } // Since both the JarSet and FileSet have the same format this is used for both. |