aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorThomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>2016-05-06 19:31:26 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-06 19:31:26 -0700
commitcc95f1ed5fdf2566bcefe8d10116eee544cf9184 (patch)
tree7cd0f48681e96228c3c60d30fabc87a95ac0342c /streaming
parentf7b7ef41662d7d02fc4f834f3c6c4ee8802e949c (diff)
downloadspark-cc95f1ed5fdf2566bcefe8d10116eee544cf9184.tar.gz
spark-cc95f1ed5fdf2566bcefe8d10116eee544cf9184.tar.bz2
spark-cc95f1ed5fdf2566bcefe8d10116eee544cf9184.zip
[SPARK-1239] Improve fetching of map output statuses
The main issue we are trying to solve is the memory bloat of the Driver when tasks request the map output statuses. This means with a large number of tasks you either need a huge amount of memory on Driver or you have to repartition to smaller number. This makes it really difficult to run over say 50000 tasks. The main issues that cause the memory bloat are: 1) no flow control on sending the map output status responses. We serialize the map status output and then hand off to netty to send. netty is sending asynchronously and it can't send them fast enough to keep up with incoming requests so we end up with lots of copies of the serialized map output statuses sitting there and this causes huge bloat when you have 10's of thousands of tasks and map output status is in the 10's of MB. 2) When initial reduce tasks are started up, they all request the map output statuses from the Driver. These requests are handled by multiple threads in parallel so even though we check to see if we have a cached version, initially when we don't have a cached version yet, many of initial requests can all end up serializing the exact same map output statuses. This patch does a couple of things: - When the map output status size is over a threshold (default 512K) then it uses broadcast to send the map statuses. This means we no longer serialize a large map output status and thus we don't have issues with memory bloat. the messages sizes are now in the 300-400 byte range and the map status output are broadcast. If its under the threadshold it sends it as before, the message contains the DIRECT indicator now. - synchronize the incoming requests to allow one thread to cache the serialized output and broadcast the map output status that can then be used by everyone else. This ensures we don't create multiple broadcast variables when we don't need to. To ensure this happens I added a second thread pool which the Dispatcher hands the requests to so that those threads can block without blocking the main dispatcher threads (which would cause things like heartbeats and such not to come through) Note that some of design and code was contributed by mridulm ## How was this patch tested? Unit tests and a lot of manually testing. Ran with akka and netty rpc. Ran with both dynamic allocation on and off. one of the large jobs I used to test this was a join of 15TB of data. it had 200,000 map tasks, and 20,000 reduce tasks. Executors ranged from 200 to 2000. This job ran successfully with 5GB of memory on the driver with these changes. Without these changes I was using 20GB and only had 500 reduce tasks. The job has 50mb of serialized map output statuses and took roughly the same amount of time for the executors to get the map output statuses as before. Ran a variety of other jobs, from large wordcounts to small ones not using broadcasts. Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> Closes #12113 from tgravescs/SPARK-1239.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 4be4882938..e97427991b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -57,7 +58,8 @@ class ReceivedBlockHandlerSuite
val hadoopConf = new Configuration()
val streamId = 1
val securityMgr = new SecurityManager(conf)
- val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val broadcastManager = new BroadcastManager(true, conf, securityMgr)
+ val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
val shuffleManager = new SortShuffleManager(conf)
val serializer = new KryoSerializer(conf)
var serializerManager = new SerializerManager(serializer, conf)