aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles Reiss <woggle@apache.org>2013-08-09 11:05:30 -0700
committerCharles Reiss <woggle@apache.org>2013-08-09 11:09:02 -0700
commit9dfc280f74daf24334aa73bb57f07e7e71213b63 (patch)
tree7f64290e14c1b937dcc5694dc91fe88de6a632cb
parentf94fc75c3f3b8b1337fe3f849c2cba119eaa9bc7 (diff)
downloadspark-9dfc280f74daf24334aa73bb57f07e7e71213b63.tar.gz
spark-9dfc280f74daf24334aa73bb57f07e7e71213b63.tar.bz2
spark-9dfc280f74daf24334aa73bb57f07e7e71213b63.zip
Remove extra synchronization in ResultTask
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala16
1 files changed, 7 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 1ced6f9524..832ca18b8c 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -51,15 +51,13 @@ private[spark] object ResultTask {
}
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
- synchronized {
- val loader = Thread.currentThread.getContextClassLoader
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
- return (rdd, func)
- }
+ val loader = Thread.currentThread.getContextClassLoader
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
+ val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+ val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
+ return (rdd, func)
}
def clearCache() {