aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-26 15:28:49 -0700
committerAndrew Or <andrew@databricks.com>2015-05-26 15:28:49 -0700
commit9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1 (patch)
treedf4190414602fead033a5aa3b1e17c2821a1cda5 /core
parent2e9a5f229e1a2ccffa74fa59fa6a55b2704d9c1a (diff)
downloadspark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.tar.gz
spark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.tar.bz2
spark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.zip
[SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem
Author: zsxwing <zsxwing@gmail.com> Closes #6333 from zsxwing/remove-actor-system-usage and squashes the following commits: f125aa6 [zsxwing] Fix YarnAllocatorSuite ceadcf6 [zsxwing] Change the "port" parameter type of "AkkaUtils.address" to "int"; update ApplicationMaster and YarnAllocator to get the driverUrl from RpcEnv 3239380 [zsxwing] Remove some places in core that calling SparkEnv.actorSystem
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
3 files changed, 16 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b4b8a63069..ed3dde0fc3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler
import java.nio.ByteBuffer
import java.util.{TimerTask, Timer}
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
-import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
@@ -32,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
@@ -64,6 +64,9 @@ private[spark] class TaskSchedulerImpl(
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
+ private val speculationScheduler =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
+
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
@@ -142,10 +145,11 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
- sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
- SPECULATION_INTERVAL_MS milliseconds) {
- Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
- }(sc.env.actorSystem.dispatcher)
+ speculationScheduler.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
+ checkSpeculatableTasks()
+ }
+ }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
@@ -412,6 +416,7 @@ private[spark] class TaskSchedulerImpl(
}
override def stop() {
+ speculationScheduler.shutdown()
if (backend != null) {
backend.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index dc59545b43..aff086594c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -25,9 +25,10 @@ import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
@@ -115,11 +116,9 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = AkkaUtils.address(
- AkkaUtils.protocol(sc.env.actorSystem),
+ val driverUrl = sc.env.rpcEnv.uriOf(
SparkEnv.driverActorSystemName,
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port"),
+ RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val uri = conf.getOption("spark.executor.uri")
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index de3316d083..7513b1b795 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -235,7 +235,7 @@ private[spark] object AkkaUtils extends Logging {
protocol: String,
systemName: String,
host: String,
- port: Any,
+ port: Int,
actorName: String): String = {
s"$protocol://$systemName@$host:$port/user/$actorName"
}