aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala12
1 files changed, 12 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 959aefabd8..0c4d28f786 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
import org.apache.spark.util.{RpcUtils, Utils}
/**
@@ -69,6 +70,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
+ val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
@@ -382,6 +384,15 @@ object SparkEnv extends Logging {
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
+ val executorMemoryManager: ExecutorMemoryManager = {
+ val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
+ MemoryAllocator.UNSAFE
+ } else {
+ MemoryAllocator.HEAP
+ }
+ new ExecutorMemoryManager(allocator)
+ }
+
val envInstance = new SparkEnv(
executorId,
rpcEnv,
@@ -398,6 +409,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
+ executorMemoryManager,
outputCommitCoordinator,
conf)