aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/SparkFiles.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--project/MimaExcludes.scala4
5 files changed, 12 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4bf8890c05..af50a6dc2d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -61,10 +60,8 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
- val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
- val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
@@ -77,7 +74,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- private var driverTmpDirToDelete: Option[String] = None
+ private[spark] var driverTmpDir: Option[String] = None
private[spark] def stop() {
@@ -94,13 +91,10 @@ class SparkEnv (
rpcEnv.shutdown()
rpcEnv.awaitTermination()
- // Note that blockTransferService is stopped by BlockManager since it is started by it.
-
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
- // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
- // current working dir in executor which we do not need to delete.
- driverTmpDirToDelete match {
+ // We only need to delete the tmp dir create by driver
+ driverTmpDir match {
case Some(path) =>
try {
Utils.deleteRecursively(new File(path))
@@ -342,15 +336,6 @@ object SparkEnv extends Logging {
ms
}
- // Set the sparkFiles directory, used when downloading dependencies. In local mode,
- // this is a temporary directory; in distributed mode, this is the executor's current working
- // directory.
- val sparkFilesDir: String = if (isDriver) {
- Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
- } else {
- "."
- }
-
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
@@ -367,10 +352,8 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
- blockTransferService,
blockManager,
securityManager,
- sparkFilesDir,
metricsSystem,
memoryManager,
outputCommitCoordinator,
@@ -380,7 +363,8 @@ object SparkEnv extends Logging {
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
- envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
+ val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
+ envInstance.driverTmpDir = Some(sparkFilesDir)
}
envInstance
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala
index e85b89fd01..44f4444a1f 100644
--- a/core/src/main/scala/org/apache/spark/SparkFiles.scala
+++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala
@@ -34,6 +34,6 @@ object SparkFiles {
* Get the root directory that contains files added through `SparkContext.addFile()`.
*/
def getRootDirectory(): String =
- SparkEnv.get.sparkFilesDir
+ SparkEnv.get.driverTmpDir.getOrElse(".")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f2d06c7ea8..c56e451c11 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -65,7 +65,7 @@ private[spark] class BlockManager(
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService,
+ val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index a0086e1843..0be25e9f89 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -196,7 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray
val blockId = blockIds(0)
val blockManager = SparkEnv.get.blockManager
- val blockTransfer = SparkEnv.get.blockTransferService
+ val blockTransfer = blockManager.blockTransferService
val serializerManager = SparkEnv.get.serializerManager
blockManager.master.getLocations(blockId).foreach { cmId =>
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 33e0db606c..a5d57e1b01 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -694,6 +694,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.weights"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionModel.weights")
) ++ Seq(
+ // [SPARK-10653] [Core] Remove unnecessary things from SparkEnv
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.sparkFilesDir"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.blockTransferService")
+ ) ++ Seq(
// SPARK-14654: New accumulator API
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"),