aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-22 15:58:08 -0700
committerReynold Xin <rxin@databricks.com>2015-10-22 15:58:08 -0700
commit34e71c6d89c1f2b6236dbf0d75cd12da08003c84 (patch)
treeb3919d8173e1d56885f4f19baa28b62590c87d05 /core
parent163d53e829c166f061589cc379f61642d4c9a40f (diff)
downloadspark-34e71c6d89c1f2b6236dbf0d75cd12da08003c84.tar.gz
spark-34e71c6d89c1f2b6236dbf0d75cd12da08003c84.tar.bz2
spark-34e71c6d89c1f2b6236dbf0d75cd12da08003c84.zip
[SPARK-11251] Fix page size calculation in local mode
``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew Or <andrew@databricks.com> Closes #9209 from andrewor14/fix-local-page-size.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala3
3 files changed, 40 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ccba3ed9e6..a6857b4c7d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
- SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+ SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}
private[spark] def env: SparkEnv = _env
@@ -2561,24 +2561,28 @@ object SparkContext extends Logging {
}
/**
+ * The number of driver cores to use for execution in local mode, 0 otherwise.
+ */
+ private[spark] def numDriverCores(master: String): Int = {
+ def convertToInt(threads: String): Int = {
+ if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
+ }
+ master match {
+ case "local" => 1
+ case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+ case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
+ case _ => 0 // driver is not used for execution
+ }
+ }
+
+ /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
- // Regular expression used for local[N] and local[*] master formats
- val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
- // Regular expression for local[N, maxRetries], used in tests with failing tasks
- val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
- // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
- val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
- // Regular expression for connecting to Spark deploy clusters
- val SPARK_REGEX = """spark://(.*)""".r
- // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
- val MESOS_REGEX = """(mesos|zk)://.*""".r
- // Regular expression for connection to Simr cluster
- val SIMR_REGEX = """simr://(.*)""".r
+ import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
@@ -2720,6 +2724,24 @@ object SparkContext extends Logging {
}
/**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+ // Regular expression used for local[N] and local[*] master formats
+ val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+ // Regular expression for local[N, maxRetries], used in tests with failing tasks
+ val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+ val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+ // Regular expression for connecting to Spark deploy clusters
+ val SPARK_REGEX = """spark://(.*)""".r
+ // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
+ val MESOS_REGEX = """(mesos|zk)://.*""".r
+ // Regular expression for connection to Simr cluster
+ val SIMR_REGEX = """simr://(.*)""".r
+}
+
+/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The getter for the writable class takes a ClassTag[T] in case this is a generic object
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 704158bfc7..b5c35c569e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -190,6 +190,7 @@ object SparkEnv extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
+ numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
@@ -202,6 +203,7 @@ object SparkEnv extends Logging {
port,
isDriver = true,
isLocal = isLocal,
+ numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
@@ -241,8 +243,8 @@ object SparkEnv extends Logging {
port: Int,
isDriver: Boolean,
isLocal: Boolean,
+ numUsableCores: Int,
listenerBus: LiveListenerBus = null,
- numUsableCores: Int = 0,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// Listener bus is only used on the driver
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 6d08d7c5b7..48456a9cd6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -87,7 +87,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
- SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
+ SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
+ SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else