aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-05-08 22:30:17 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-08 22:30:17 -0700
commit7db47c463fefc244e9c100d4aab90451c3828261 (patch)
treee4e72eab8d0d8466274d318a8052d0b1250a6cab /core/src
parent06b15baab25951d124bbe6b64906f4139e037deb (diff)
downloadspark-7db47c463fefc244e9c100d4aab90451c3828261.tar.gz
spark-7db47c463fefc244e9c100d4aab90451c3828261.tar.bz2
spark-7db47c463fefc244e9c100d4aab90451c3828261.zip
SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo
This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables. Author: Sandeep <sandeep@techaddict.me> Closes #707 from techaddict/SPARK-1775 and squashes the following commits: 18d8ebf [Sandeep] SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala16
1 files changed, 7 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 4b0324f2b5..9ba586f758 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -57,15 +57,13 @@ private[spark] object ShuffleMapTask {
}
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
- synchronized {
- val loader = Thread.currentThread.getContextClassLoader
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
- (rdd, dep)
- }
+ val loader = Thread.currentThread.getContextClassLoader
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val objIn = ser.deserializeStream(in)
+ val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+ val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
+ (rdd, dep)
}
// Since both the JarSet and FileSet have the same format this is used for both.