aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAlex Bozarth <ajbozart@us.ibm.com>2016-05-09 11:51:37 -0700
committerAndrew Or <andrew@databricks.com>2016-05-09 11:51:37 -0700
commitc3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a (patch)
tree4d6d98f7a7db550c09d1ce4dfa775222080faa94 /core
parent7bf9b12019bb20470b726a7233d60ce38a9c52cc (diff)
downloadspark-c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a.tar.gz
spark-c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a.tar.bz2
spark-c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a.zip
[SPARK-10653][CORE] Remove unnecessary things from SparkEnv
## What changes were proposed in this pull request? Removed blockTransferService and sparkFilesDir from SparkEnv since they're rarely used and don't need to be in stored in the env. Edited their few usages to accommodate the change. ## How was this patch tested? ran dev/run-tests locally Author: Alex Bozarth <ajbozart@us.ibm.com> Closes #12970 from ajbozarth/spark10653.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/SparkFiles.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
4 files changed, 8 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4bf8890c05..af50a6dc2d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -61,10 +60,8 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
- val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
- val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
@@ -77,7 +74,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- private var driverTmpDirToDelete: Option[String] = None
+ private[spark] var driverTmpDir: Option[String] = None
private[spark] def stop() {
@@ -94,13 +91,10 @@ class SparkEnv (
rpcEnv.shutdown()
rpcEnv.awaitTermination()
- // Note that blockTransferService is stopped by BlockManager since it is started by it.
-
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
- // current working dir in executor which we do not need to delete.
- driverTmpDirToDelete match {
+ // We only need to delete the tmp dir create by driver
+ driverTmpDir match {
case Some(path) =>
try {
Utils.deleteRecursively(new File(path))
@@ -342,15 +336,6 @@ object SparkEnv extends Logging {
ms
}
- // Set the sparkFiles directory, used when downloading dependencies. In local mode,
- // this is a temporary directory; in distributed mode, this is the executor's current working
- // directory.
- val sparkFilesDir: String = if (isDriver) {
- Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
- } else {
- "."
- }
-
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
@@ -367,10 +352,8 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
- blockTransferService,
blockManager,
securityManager,
- sparkFilesDir,
metricsSystem,
memoryManager,
outputCommitCoordinator,
@@ -380,7 +363,8 @@ object SparkEnv extends Logging {
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
- envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
+ val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
+ envInstance.driverTmpDir = Some(sparkFilesDir)
}
envInstance
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala
index e85b89fd01..44f4444a1f 100644
--- a/core/src/main/scala/org/apache/spark/SparkFiles.scala
+++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala
@@ -34,6 +34,6 @@ object SparkFiles {
* Get the root directory that contains files added through `SparkContext.addFile()`.
*/
def getRootDirectory(): String =
- SparkEnv.get.sparkFilesDir
+ SparkEnv.get.driverTmpDir.getOrElse(".")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f2d06c7ea8..c56e451c11 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -65,7 +65,7 @@ private[spark] class BlockManager(
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService,
+ val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index a0086e1843..0be25e9f89 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -196,7 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray
val blockId = blockIds(0)
val blockManager = SparkEnv.get.blockManager
- val blockTransfer = SparkEnv.get.blockTransferService
+ val blockTransfer = blockManager.blockTransferService
val serializerManager = SparkEnv.get.serializerManager
blockManager.master.getLocations(blockId).foreach { cmId =>
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,