aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <scrapcodes@gmail.com>2013-09-15 10:55:12 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-09-15 10:55:12 +0530
commit383e151fd7138cc6a143b3a38037cc3038c2a8b9 (patch)
tree0dbb2c0d8fdeff4c37a577eb96acb87ee7838a16 /core
parent20c65bc334091d8d05fb680551155aa182d98f7d (diff)
parentc4c1db2dd5b2ec0a8182369ecdb0e14f4e199822 (diff)
downloadspark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.gz
spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.bz2
spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.zip
Merge branch 'master' of git://github.com/mesos/spark into scala-2.10
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala project/SparkBuild.scala
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml9
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala91
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala2
35 files changed, 329 insertions, 82 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 076236e20a..382473a3b3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -37,6 +37,11 @@
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <version>0.7.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
@@ -147,6 +152,10 @@
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index e299a106ee..68b99ca125 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -66,10 +66,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- val elements = new ArrayBuffer[Any]
logInfo("Computing partition " + split)
- elements ++= rdd.computeOrReadCheckpoint(split, context)
- // Try to put this block in the blockManager
+ val computedValues = rdd.computeOrReadCheckpoint(split, context)
+ // Persist the result, so long as the task is not running locally
+ if (context.runningLocally) { return computedValues }
+ val elements = new ArrayBuffer[Any]
+ elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 04d172a989..882bc506b4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -258,20 +258,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new DynamicVariable[Properties](null)
+ private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() {
- localProperties.value = new Properties()
+ localProperties.set(new Properties())
}
def setLocalProperty(key: String, value: String) {
- if (localProperties.value == null) {
- localProperties.value = new Properties()
+ if (localProperties.get() == null) {
+ localProperties.set(new Properties())
}
if (value == null) {
- localProperties.value.remove(key)
+ localProperties.get.remove(key)
} else {
- localProperties.value.setProperty(key, value)
+ localProperties.get.setProperty(key, value)
}
}
@@ -283,8 +283,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
- val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -724,7 +724,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
@@ -807,7 +808,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index b2dd668330..c2c358c7ad 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -24,6 +24,7 @@ class TaskContext(
val stageId: Int,
val splitId: Int,
val attemptId: Long,
+ val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty()
) extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index a6be8efef1..87a703427c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -44,7 +44,9 @@ private[spark] object JsonProtocol {
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
- ("submitdate" -> obj.submitDate.toString)
+ ("submitdate" -> obj.submitDate.toString) ~
+ ("state" -> obj.state.toString) ~
+ ("duration" -> obj.duration)
}
def writeApplicationDescription(obj: ApplicationDescription) = {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d365804994..ceae3b8289 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
- val executorSource = new ExecutorSource(this)
+ val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index bf8fb4fd21..18c9dc1c0a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
-class ExecutorSource(val executor: Executor) extends Source {
+class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
- val sourceName = "executor"
+ // TODO: It would be nice to pass the application name here
+ val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 0f9c4e00b1..caab748d60 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
- prop.setProperty("*.sink.servlet.uri", "/metrics/json")
- prop.setProperty("*.sink.servlet.sample", "false")
- prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
- prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
+ prop.setProperty("*.sink.servlet.path", "/metrics/json")
+ prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
new file mode 100644
index 0000000000..b924907070
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.ganglia.GangliaReporter
+import com.codahale.metrics.MetricRegistry
+import info.ganglia.gmetric4j.gmetric.GMetric
+
+import org.apache.spark.metrics.MetricsSystem
+
+class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val GANGLIA_KEY_PERIOD = "period"
+ val GANGLIA_DEFAULT_PERIOD = 10
+
+ val GANGLIA_KEY_UNIT = "unit"
+ val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
+
+ val GANGLIA_KEY_MODE = "mode"
+ val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
+
+ // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
+ val GANGLIA_KEY_TTL = "ttl"
+ val GANGLIA_DEFAULT_TTL = 1
+
+ val GANGLIA_KEY_HOST = "host"
+ val GANGLIA_KEY_PORT = "port"
+
+ def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+ if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
+ throw new Exception("Ganglia sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
+ throw new Exception("Ganglia sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(GANGLIA_KEY_HOST).get
+ val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+ val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+ val mode = propertyToOption(GANGLIA_KEY_MODE)
+ .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+ val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+ .getOrElse(GANGLIA_DEFAULT_PERIOD)
+ val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
+ .getOrElse(GANGLIA_DEFAULT_UNIT)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val ganglia = new GMetric(host, port, mode, ttl)
+ val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(ganglia)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 4e90dd4323..99357fede6 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
- val SERVLET_KEY_URI = "uri"
+ val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
- val servletURI = property.getProperty(SERVLET_KEY_URI)
+ val SERVLET_DEFAULT_SAMPLE = false
- val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+ val servletPath = property.getProperty(SERVLET_KEY_PATH)
+
+ val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean)
+ .getOrElse(SERVLET_DEFAULT_SAMPLE)
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
- (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+ (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 537f225469..8afcbe190a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -20,6 +20,7 @@ package org.apache.spark.network.netty
import java.io.File
import org.apache.spark.Logging
+import org.apache.spark.util.Utils
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
@@ -57,7 +58,7 @@ private[spark] object ShuffleSender {
throw new Exception("Block " + blockId + " is not a shuffle block")
}
// Figure out which local directory it hashes to, and which subdirectory in that
- val hash = math.abs(blockId.hashCode)
+ val hash = Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 70c967f4bf..ea4eeb7dbf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -270,6 +270,23 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
+ *
+ * This results in a narrow dependency, e.g. if you go from 1000 partitions
+ * to 100 partitions, there will not be a shuffle, instead each of the 100
+ * new partitions will claim 10 of the current partitions.
+ *
+ * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+ * this may result in your computation taking place on fewer nodes than
+ * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+ * you can pass shuffle = true. This will add a shuffle step, but means the
+ * current upstream partitions will be executed in parallel (per whatever
+ * the current partitioning is).
+ *
+ * Note: With shuffle = true, you can actually coalesce to a larger number
+ * of partitions. This is useful if you have a small number of partitions,
+ * say 100, potentially with a few partitions being abnormally large. Calling
+ * coalesce(1000, shuffle = true) will result in 1000 partitions with the
+ * data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 854dbfee09..dca84db597 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -479,7 +479,8 @@ class DAGScheduler(
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
- val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
+ val taskContext =
+ new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
@@ -532,9 +533,16 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
+
+ val properties = if (idToActiveJob.contains(stage.jobId)) {
+ idToActiveJob(stage.jobId).properties
+ } else {
+ //this stage will be assigned to "default" pool
+ null
+ }
+
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
- val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 22e3723ac8..446d490cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "DAGScheduler"
+ val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 2b007cbe82..07e8317e3a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -77,7 +77,7 @@ private[spark] class ResultTask[T, U](
var func: (TaskContext, Iterator[T]) => U,
var partition: Int,
@transient locs: Seq[TaskLocation],
- val outputId: Int)
+ var outputId: Int)
extends Task[U](stageId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
@@ -93,7 +93,7 @@ private[spark] class ResultTask[T, U](
}
override def run(attemptId: Long): U = {
- val context = new TaskContext(stageId, partition, attemptId)
+ val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
@@ -130,7 +130,7 @@ private[spark] class ResultTask[T, U](
rdd = rdd_.asInstanceOf[RDD[T]]
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
- val outputId = in.readInt()
+ outputId = in.readInt()
epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 764775fede..d23df0dd2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -132,7 +132,7 @@ private[spark] class ShuffleMapTask(
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
- val taskContext = new TaskContext(stageId, partition, attemptId)
+ val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(taskContext.taskMetrics)
val blockManager = SparkEnv.get.blockManager
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 3196ab5022..919acce828 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -94,7 +94,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+ System.getProperty("spark.scheduler.mode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 1b31c8c57e..0ac3d7bcfd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -335,7 +335,7 @@ private[spark] class ClusterTaskSetManager(
}
/**
- * Respond to an offer of a single slave from the scheduler by finding a task
+ * Respond to an offer of a single executor from the scheduler by finding a task
*/
override def resourceOffer(
execId: String,
@@ -358,7 +358,7 @@ private[spark] class ClusterTaskSetManager(
val task = tasks(index)
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
- logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
+ logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index d04eeb6b98..f80823317b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -51,8 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
- val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
+ val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+ val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
val SCHEDULING_MODE_PROPERTY = "schedulingMode"
@@ -60,7 +60,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val POOL_NAME_PROPERTY = "@name"
val POOLS_PROPERTY = "pool"
val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
- val DEFAULT_MINIMUM_SHARE = 2
+ val DEFAULT_MINIMUM_SHARE = 0
val DEFAULT_WEIGHT = 1
override def buildPools() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9a2cf20de7..9c49768c0c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -76,17 +76,17 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
- executorId, hostPort, cores, Utils.megabytesToString(memory)))
+ fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
- override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
+ override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
}
- logInfo("Executor %s removed: %s".format(executorId, message))
- removeExecutor(executorId, reason.toString)
+ logInfo("Executor %s removed: %s".format(fullId, message))
+ removeExecutor(fullId.split("/")(1), reason.toString)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index e8fa5e2f17..8cb4d1396f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -91,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+ System.getProperty("spark.scheduler.mode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 3d709cfde4..acc3951088 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "BlockManager"
+ val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index fc25ef0fae..63447baf8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Getting file for block " + blockId)
// Figure out which local directory it hashes to, and which subdirectory in that
- val hash = math.abs(blockId.hashCode)
+ val hash = Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 755f1a760e..632ff047d1 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -23,9 +23,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
* in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
- * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants for
- * commonly useful storage levels. To create your own storage level object, use the factor method
- * of the singleton object (`StorageLevel(...)`).
+ * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
+ * for commonly useful storage levels. To create your own storage level object, use the
+ * factory method of the singleton object (`StorageLevel(...)`).
*/
class StorageLevel private(
private var useDisk_ : Boolean,
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 48eb096063..f1d86c0221 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -84,6 +84,6 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
}
private[spark] object SparkUI {
- val DEFAULT_PORT = "3030"
+ val DEFAULT_PORT = "4040"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 0ecb22d2f9..3ec9760ed0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -43,13 +43,13 @@ private[spark] object UIWorkloadGenerator {
val appName = "Spark UI Tester"
if (schedulingMode == SchedulingMode.FAIR) {
- System.setProperty("spark.cluster.schedulingmode", "FAIR")
+ System.setProperty("spark.scheduler.mode", "FAIR")
}
val sc = new SparkContext(master, appName)
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
- sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
+ sc.setLocalProperty("spark.scheduler.pool", s)
}
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index e2bcd98545..5d46f38a2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -95,7 +95,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
activeStages += stage
val poolName = Option(stageSubmitted.properties).map {
- p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageToPool(stage) = poolName
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d8d014de7d..be215fc127 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -464,12 +464,20 @@ private[spark] object Utils extends Logging {
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+ private def listFilesSafely(file: File): Seq[File] = {
+ val files = file.listFiles()
+ if (files == null) {
+ throw new IOException("Failed to list files for dir: " + file)
+ }
+ files
+ }
+
/**
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(file: File) {
if (file.isDirectory) {
- for (child <- file.listFiles()) {
+ for (child <- listFilesSafely(file)) {
deleteRecursively(child)
}
}
@@ -785,4 +793,18 @@ private[spark] object Utils extends Logging {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
+
+ // Handles idiosyncracies with hash (add more as required)
+ def nonNegativeHash(obj: AnyRef): Int = {
+
+ // Required ?
+ if (obj eq null) return 0
+
+ val hash = obj.hashCode
+ // math.abs fails for Int.MinValue
+ val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
+
+ // Nothing else to guard against ?
+ hashAbs
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
new file mode 100644
index 0000000000..3a7171c488
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.mock.EasyMockSugar
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockManager, StorageLevel}
+
+// TODO: Test the CacheManager's thread-safety aspects
+class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
+ var sc : SparkContext = _
+ var blockManager: BlockManager = _
+ var cacheManager: CacheManager = _
+ var split: Partition = _
+ /** An RDD which returns the values [1, 2, 3, 4]. */
+ var rdd: RDD[Int] = _
+
+ before {
+ sc = new SparkContext("local", "test")
+ blockManager = mock[BlockManager]
+ cacheManager = new CacheManager(blockManager)
+ split = new Partition { override def index: Int = 0 }
+ rdd = new RDD[Int](sc, Nil) {
+ override def getPartitions: Array[Partition] = Array(split)
+ override val getDependencies = List[Dependency[_]]()
+ override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
+ }
+ }
+
+ after {
+ sc.stop()
+ }
+
+ test("get uncached rdd") {
+ expecting {
+ blockManager.get("rdd_0_0").andReturn(None)
+ blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true).
+ andReturn(0)
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = false, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(1, 2, 3, 4))
+ }
+ }
+
+ test("get cached rdd") {
+ expecting {
+ blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = false, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(5, 6, 7))
+ }
+ }
+
+ test("get uncached local rdd") {
+ expecting {
+ // Local computation should not persist the resulting value, so don't expect a put().
+ blockManager.get("rdd_0_0").andReturn(None)
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = true, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(1, 2, 3, 4))
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 8a869c9005..591c1d498d 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -495,7 +495,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContext(0, 0, 0, null);
+ TaskContext context = new TaskContext(0, 0, 0, false, null);
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index 58c94a162d..1a9ce8c607 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 5)
+ assert(conf.properties.size() === 4)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 3)
+ assert(property.size() === 2)
assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(property.getProperty("sink.servlet.sample") === "false")
+ assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
@@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 6)
+ assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
- assert(masterProp.getProperty("sink.servlet.sample") === "false")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 6)
+ assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(workerProp.getProperty("sink.servlet.sample") === "false")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
@@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
- assert(servletProps.size() === 3)
+ assert(servletProps.size() === 2)
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index adc971050e..c1df5e151e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.union(emptyKv).collect().size === 2)
}
- test("cogrouped RDDs") {
+ test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = data.coalesce(2)
@@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
+
+ // when shuffling, we can increase the number of partitions
+ val coalesced6 = data.coalesce(20, shuffle = true)
+ assert(coalesced6.partitions.size === 20)
+ assert(coalesced6.collect().toSet === (1 to 10).toSet)
}
- test("cogrouped RDDs with locality") {
+
+ test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
@@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced4 = data.coalesce(20)
val listOfLists = coalesced4.glom().collect().map(_.toList).toList
val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
- assert( sortedList === (1 to 9).
+ assert(sortedList === (1 to 9).
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
- test("cogrouped RDDs with locality, large scale (10K partitions)") {
+ test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 92ad9f09b2..1b50ce06b3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -166,7 +166,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
+ System.setProperty("spark.scheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
@@ -179,13 +179,13 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").weight === 1)
- assert(rootPool.getSchedulableByName("3").minShare === 2)
+ assert(rootPool.getSchedulableByName("3").minShare === 0)
assert(rootPool.getSchedulableByName("3").weight === 1)
val properties1 = new Properties()
- properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
+ properties1.setProperty("spark.scheduler.pool","1")
val properties2 = new Properties()
- properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
+ properties2.setProperty("spark.scheduler.pool","2")
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
index 111340a65c..af76c843e8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
+ sc.setLocalProperty("spark.scheduler.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
@@ -90,7 +90,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
test("Local FIFO scheduler end-to-end test") {
- System.setProperty("spark.cluster.schedulingmode", "FIFO")
+ System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[4]", "test")
val sem = new Semaphore(0)
@@ -150,9 +150,9 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
test("Local fair scheduler end-to-end test") {
sc = new SparkContext("local[8]", "LocalSchedulerSuite")
val sem = new Semaphore(0)
- System.setProperty("spark.cluster.schedulingmode", "FAIR")
+ System.setProperty("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
+ System.setProperty("spark.scheduler.allocation.file", xmlPath)
createThread(10,"1",sc,sem)
TaskThreadInfo.threadToStarted(10).await()
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 3321fb5eb7..07c9f2382b 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -24,7 +24,7 @@ import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
- val startPort = 3030
+ val startPort = 4040
val server = new Server(startPort)
server.start()
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())