aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-06-22 19:31:15 -0700
committerAaron Davidson <aaron@databricks.com>2014-06-22 19:31:15 -0700
commit67bffd3c7ee8e9e3395e714e470459f09d19e66d (patch)
treee28ebe11a97be5721c706125c8a0d813e0ec9779
parent64316af5a29f77753d3bd9dab7b0b9b4e1dd5592 (diff)
downloadspark-67bffd3c7ee8e9e3395e714e470459f09d19e66d.tar.gz
spark-67bffd3c7ee8e9e3395e714e470459f09d19e66d.tar.bz2
spark-67bffd3c7ee8e9e3395e714e470459f09d19e66d.zip
[SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
SPARK-1112: This is a more conservative version of #1132 that doesn't change around the actor system initialization on the executor. Instead we just directly read the current frame size limit from the ActorSystem. SPARK-2156: This uses the same fixe as in #1132. Author: Patrick Wendell <pwendell@gmail.com> Closes #1172 from pwendell/akka-10-fix and squashes the following commits: d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test expectations 9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala21
7 files changed, 33 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 2279d77c91..70c1f4cac5 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
- cores: Int)
+ cores: Int,
+ actorSystem: ActorSystem)
extends Actor
with ExecutorBackend
with Logging {
@@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
+
+ override def akkaFrameSize() = actorSystem.settings.config.getBytes(
+ "akka.remote.netty.tcp.maximum-frame-size")
}
private[spark] object CoarseGrainedExecutorBackend {
@@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend {
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
- sparkHostPort, cores),
+ sparkHostPort, cores, actorSystem),
name = "Executor")
workerUrl.foreach {
url =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index baee7a216a..214a8c835e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -97,10 +97,6 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
- // Akka's message frame size. If task result is bigger than this, we use the block manager
- // to send the result back.
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-
// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
@@ -211,8 +207,10 @@ private[spark] class Executor(
task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
+
val serializedResult = {
- if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
+ if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
+ AkkaUtils.reservedSizeBytes) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
index 3d34960653..43a15cd3d6 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -26,4 +26,7 @@ import org.apache.spark.TaskState.TaskState
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
+
+ // Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
+ def akkaFrameSize(): Long = Long.MaxValue
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e47a060683..08bc21884c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -143,7 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
- if (serializedTask.limit >= akkaFrameSize - 1024) {
+ if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 43f0e18a0c..a98c891de8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
import akka.actor.{Actor, ActorRef, Props}
-
import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.util.AkkaUtils
private case class ReviveOffers()
@@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}
+
+ // This limit is calculated only to preserve expected behavior in tests. In reality, since this
+ // backend sends messages over the existing actor system, there is no need to enforce a limit.
+ override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index a8d12bb2a0..9930c71749 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}
+
+ /** Space reserved for extra data in an Akka message besides serialized task or task result. */
+ val reservedSizeBytes = 200 * 1024
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6b2571cd92..1b64d49285 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -182,7 +182,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -191,14 +190,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor
- // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
- // Note that the size is hand-selected here because map output statuses are compressed before
- // being sent.
- masterTracker.registerShuffle(20, 100)
- (0 until 100).foreach { i =>
- masterTracker.registerMapOutput(20, i, new MapStatus(
- BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+ // Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
+ // throw exception.
+ val shuffleId = 20
+ val numMaps = 2
+ val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
+ val random = new java.util.Random(0)
+ random.nextBytes(data) // Make it hard to compress.
+ masterTracker.registerShuffle(shuffleId, numMaps)
+ (0 until numMaps).foreach { i =>
+ masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
+ BlockManagerId("999", "mps", 1000, 0), data))
}
- intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+ intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
}
}