aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala')
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3ee2eb69e8..8f3cc54051 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,6 +34,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
@@ -61,7 +62,7 @@ private[worker] class Worker(
assert (port > 0)
// For worker and executor IDs
- private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
+ private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -85,10 +86,10 @@ private[worker] class Worker(
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
- private val CLEANUP_INTERVAL_MILLIS =
+ private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
- private val APP_DATA_RETENTION_SECS =
+ private val APP_DATA_RETENTION_SECS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val testing: Boolean = sys.props.contains("spark.testing")
@@ -112,7 +113,7 @@ private[worker] class Worker(
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
-
+
var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
@@ -122,7 +123,7 @@ private[worker] class Worker(
val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
- private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+ private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@@ -134,7 +135,7 @@ private[worker] class Worker(
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
-
+
private var registrationRetryTimer: Option[Cancellable] = None
var coresUsed = 0