aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/metrics.properties.template50
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala91
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala92
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala19
-rwxr-xr-xdocs/_layouts/global.html2
-rw-r--r--docs/hadoop-third-party-distributions.md76
-rw-r--r--docs/index.md5
-rw-r--r--docs/monitoring.md58
-rw-r--r--docs/running-on-yarn.md31
-rw-r--r--docs/spark-standalone.md20
-rw-r--r--ec2/deploy.generic/root/spark-ec2/ec2-variables.sh17
-rw-r--r--examples/pom.xml14
-rw-r--r--pom.xml218
-rw-r--r--project/SparkBuild.scala13
-rw-r--r--project/project/SparkPluginBuild.scala19
-rw-r--r--python/pyspark/__init__.py5
-rw-r--r--python/pyspark/context.py11
-rw-r--r--python/pyspark/rdd.py19
-rw-r--r--python/pyspark/shell.py1
-rw-r--r--python/pyspark/storagelevel.py43
-rw-r--r--repl-bin/pom.xml10
-rw-r--r--repl/pom.xml12
-rw-r--r--yarn/pom.xml61
38 files changed, 786 insertions, 261 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 6c36f3cca4..ae10f615d1 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -31,7 +31,7 @@
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
-# is 1 second.
+# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
@@ -47,11 +47,45 @@
# instance master and applications. MetricsServlet may not be configured by self.
#
+## List of available sinks and their properties.
+
+# org.apache.spark.metrics.sink.ConsoleSink
+# Name: Default: Description:
+# period 10 Poll period
+# unit seconds Units of poll period
+
+# org.apache.spark.metrics.sink.CSVSink
+# Name: Default: Description:
+# period 10 Poll period
+# unit seconds Units of poll period
+# directory /tmp Where to store CSV files
+
+# org.apache.spark.metrics.sink.GangliaSink
+# Name: Default: Description:
+# host NONE Hostname or multicast group of Ganglia server
+# port NONE Port of Ganglia server(s)
+# period 10 Poll period
+# unit seconds Units of poll period
+# ttl 1 TTL of messages sent by Ganglia
+# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
+
+# org.apache.spark.metrics.sink.JmxSink
+
+# org.apache.spark.metrics.sink.MetricsServlet
+# Name: Default: Description:
+# path VARIES* Path prefix from the web server root
+# sample false Whether to show entire set of samples for histograms ('false' or 'true')
+#
+# * Default path is /metrics/json for all instances except the master. The master has two paths:
+# /metrics/aplications/json # App information
+# /metrics/master/json # Master information
+
+## Examples
# Enable JmxSink for all instances by class name
-#*.sink.jmx.class=spark.metrics.sink.JmxSink
+#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
-#*.sink.console.class=spark.metrics.sink.ConsoleSink
+#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
@@ -64,7 +98,7 @@
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
-#*.sink.csv.class=spark.metrics.sink.CsvSink
+#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
@@ -80,11 +114,11 @@
#worker.sink.csv.unit=minutes
# Enable jvm source for instance master, worker, driver and executor
-#master.source.jvm.class=spark.metrics.source.JvmSource
+#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#worker.source.jvm.class=spark.metrics.source.JvmSource
+#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#driver.source.jvm.class=spark.metrics.source.JvmSource
+#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#executor.source.jvm.class=spark.metrics.source.JvmSource
+#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index e299a106ee..68b99ca125 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -66,10 +66,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- val elements = new ArrayBuffer[Any]
logInfo("Computing partition " + split)
- elements ++= rdd.computeOrReadCheckpoint(split, context)
- // Try to put this block in the blockManager
+ val computedValues = rdd.computeOrReadCheckpoint(split, context)
+ // Persist the result, so long as the task is not running locally
+ if (context.runningLocally) { return computedValues }
+ val elements = new ArrayBuffer[Any]
+ elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89318712a5..4f711a5ea6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -282,8 +282,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
- val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index b2dd668330..c2c358c7ad 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -24,6 +24,7 @@ class TaskContext(
val stageId: Int,
val splitId: Int,
val attemptId: Long,
+ val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty()
) extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d365804994..ceae3b8289 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
- val executorSource = new ExecutorSource(this)
+ val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index bf8fb4fd21..18c9dc1c0a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
-class ExecutorSource(val executor: Executor) extends Source {
+class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
- val sourceName = "executor"
+ // TODO: It would be nice to pass the application name here
+ val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 0f9c4e00b1..caab748d60 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
- prop.setProperty("*.sink.servlet.uri", "/metrics/json")
- prop.setProperty("*.sink.servlet.sample", "false")
- prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
- prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
+ prop.setProperty("*.sink.servlet.path", "/metrics/json")
+ prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
new file mode 100644
index 0000000000..b924907070
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.ganglia.GangliaReporter
+import com.codahale.metrics.MetricRegistry
+import info.ganglia.gmetric4j.gmetric.GMetric
+
+import org.apache.spark.metrics.MetricsSystem
+
+class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val GANGLIA_KEY_PERIOD = "period"
+ val GANGLIA_DEFAULT_PERIOD = 10
+
+ val GANGLIA_KEY_UNIT = "unit"
+ val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
+
+ val GANGLIA_KEY_MODE = "mode"
+ val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
+
+ // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
+ val GANGLIA_KEY_TTL = "ttl"
+ val GANGLIA_DEFAULT_TTL = 1
+
+ val GANGLIA_KEY_HOST = "host"
+ val GANGLIA_KEY_PORT = "port"
+
+ def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+ if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
+ throw new Exception("Ganglia sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
+ throw new Exception("Ganglia sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(GANGLIA_KEY_HOST).get
+ val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+ val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+ val mode = propertyToOption(GANGLIA_KEY_MODE)
+ .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+ val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+ .getOrElse(GANGLIA_DEFAULT_PERIOD)
+ val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
+ .getOrElse(GANGLIA_DEFAULT_UNIT)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val ganglia = new GMetric(host, port, mode, ttl)
+ val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(ganglia)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 4e90dd4323..99357fede6 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
- val SERVLET_KEY_URI = "uri"
+ val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
- val servletURI = property.getProperty(SERVLET_KEY_URI)
+ val SERVLET_DEFAULT_SAMPLE = false
- val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+ val servletPath = property.getProperty(SERVLET_KEY_PATH)
+
+ val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean)
+ .getOrElse(SERVLET_DEFAULT_SAMPLE)
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
- (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+ (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 92add5b073..3e3f04f087 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -478,7 +478,8 @@ class DAGScheduler(
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
- val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
+ val taskContext =
+ new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
@@ -531,9 +532,16 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
+
+ val properties = if (idToActiveJob.contains(stage.jobId)) {
+ idToActiveJob(stage.jobId).properties
+ } else {
+ //this stage will be assigned to "default" pool
+ null
+ }
+
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
- val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 22e3723ac8..446d490cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "DAGScheduler"
+ val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 2b007cbe82..07e8317e3a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -77,7 +77,7 @@ private[spark] class ResultTask[T, U](
var func: (TaskContext, Iterator[T]) => U,
var partition: Int,
@transient locs: Seq[TaskLocation],
- val outputId: Int)
+ var outputId: Int)
extends Task[U](stageId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
@@ -93,7 +93,7 @@ private[spark] class ResultTask[T, U](
}
override def run(attemptId: Long): U = {
- val context = new TaskContext(stageId, partition, attemptId)
+ val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
@@ -130,7 +130,7 @@ private[spark] class ResultTask[T, U](
rdd = rdd_.asInstanceOf[RDD[T]]
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
- val outputId = in.readInt()
+ outputId = in.readInt()
epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 764775fede..d23df0dd2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -132,7 +132,7 @@ private[spark] class ShuffleMapTask(
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
- val taskContext = new TaskContext(stageId, partition, attemptId)
+ val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(taskContext.taskMetrics)
val blockManager = SparkEnv.get.blockManager
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 3d709cfde4..acc3951088 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "BlockManager"
+ val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 755f1a760e..632ff047d1 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -23,9 +23,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
* in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
- * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants for
- * commonly useful storage levels. To create your own storage level object, use the factor method
- * of the singleton object (`StorageLevel(...)`).
+ * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
+ * for commonly useful storage levels. To create your own storage level object, use the
+ * factory method of the singleton object (`StorageLevel(...)`).
*/
class StorageLevel private(
private var useDisk_ : Boolean,
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
new file mode 100644
index 0000000000..3a7171c488
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.mock.EasyMockSugar
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockManager, StorageLevel}
+
+// TODO: Test the CacheManager's thread-safety aspects
+class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
+ var sc : SparkContext = _
+ var blockManager: BlockManager = _
+ var cacheManager: CacheManager = _
+ var split: Partition = _
+ /** An RDD which returns the values [1, 2, 3, 4]. */
+ var rdd: RDD[Int] = _
+
+ before {
+ sc = new SparkContext("local", "test")
+ blockManager = mock[BlockManager]
+ cacheManager = new CacheManager(blockManager)
+ split = new Partition { override def index: Int = 0 }
+ rdd = new RDD[Int](sc, Nil) {
+ override def getPartitions: Array[Partition] = Array(split)
+ override val getDependencies = List[Dependency[_]]()
+ override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
+ }
+ }
+
+ after {
+ sc.stop()
+ }
+
+ test("get uncached rdd") {
+ expecting {
+ blockManager.get("rdd_0_0").andReturn(None)
+ blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true).
+ andReturn(0)
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = false, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(1, 2, 3, 4))
+ }
+ }
+
+ test("get cached rdd") {
+ expecting {
+ blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = false, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(5, 6, 7))
+ }
+ }
+
+ test("get uncached local rdd") {
+ expecting {
+ // Local computation should not persist the resulting value, so don't expect a put().
+ blockManager.get("rdd_0_0").andReturn(None)
+ }
+
+ whenExecuting(blockManager) {
+ val context = new TaskContext(0, 0, 0, runningLocally = true, null)
+ val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ assert(value.toList === List(1, 2, 3, 4))
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 8a869c9005..591c1d498d 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -495,7 +495,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContext(0, 0, 0, null);
+ TaskContext context = new TaskContext(0, 0, 0, false, null);
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
new file mode 100644
index 0000000000..05f8545c7b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.util.Date
+
+import net.liftweb.json.{JsonAST, JsonParser}
+import net.liftweb.json.JsonAST.JValue
+import org.scalatest.FunSuite
+
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.worker.ExecutorRunner
+
+class JsonProtocolSuite extends FunSuite {
+ test("writeApplicationInfo") {
+ val output = JsonProtocol.writeApplicationInfo(createAppInfo())
+ assertValidJson(output)
+ }
+
+ test("writeWorkerInfo") {
+ val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
+ assertValidJson(output)
+ }
+
+ test("writeApplicationDescription") {
+ val output = JsonProtocol.writeApplicationDescription(createAppDesc())
+ assertValidJson(output)
+ }
+
+ test("writeExecutorRunner") {
+ val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
+ assertValidJson(output)
+ }
+
+ test("writeMasterState") {
+ val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
+ val activeApps = Array[ApplicationInfo](createAppInfo())
+ val completedApps = Array[ApplicationInfo]()
+ val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps)
+ val output = JsonProtocol.writeMasterState(stateResponse)
+ assertValidJson(output)
+ }
+
+ test("writeWorkerState") {
+ val executors = List[ExecutorRunner]()
+ val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
+ val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
+ finishedExecutors, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
+ val output = JsonProtocol.writeWorkerState(stateResponse)
+ assertValidJson(output)
+ }
+
+ def createAppDesc() : ApplicationDescription = {
+ val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
+ new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl")
+ }
+ def createAppInfo() : ApplicationInfo = {
+ new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
+ }
+ def createWorkerInfo() : WorkerInfo = {
+ new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+ }
+ def createExecutorRunner() : ExecutorRunner = {
+ new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
+ new File("sparkHome"), new File("workDir"))
+ }
+
+ def assertValidJson(json: JValue) {
+ try {
+ JsonParser.parse(JsonAST.compactRender(json))
+ } catch {
+ case e: JsonParser.ParseException => fail("Invalid Json detected", e)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index 58c94a162d..1a9ce8c607 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 5)
+ assert(conf.properties.size() === 4)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 3)
+ assert(property.size() === 2)
assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(property.getProperty("sink.servlet.sample") === "false")
+ assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
@@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 6)
+ assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
- assert(masterProp.getProperty("sink.servlet.sample") === "false")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 6)
+ assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(workerProp.getProperty("sink.servlet.sample") === "false")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
@@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
- assert(servletProps.size() === 3)
+ assert(servletProps.size() === 2)
}
}
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 84749fda4e..90928c8021 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -97,7 +97,9 @@
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
+ <li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
+ <li><a href="hadoop-third-party-distributions.html">Running with CDH/HDP</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="building-with-maven.html">Building Spark with Maven</a></li>
<li><a href="contributing-to-spark.html">Contributing to Spark</a></li>
diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md
new file mode 100644
index 0000000000..9f4f354525
--- /dev/null
+++ b/docs/hadoop-third-party-distributions.md
@@ -0,0 +1,76 @@
+---
+layout: global
+title: Running with Cloudera and HortonWorks Distributions
+---
+
+Spark can run against all versions of Cloudera's Distribution Including Hadoop (CDH) and
+the Hortonworks Data Platform (HDP). There are a few things to keep in mind when using Spark with
+these distributions:
+
+# Compile-time Hadoop Version
+When compiling Spark, you'll need to
+[set the HADOOP_VERSION flag](http://localhost:4000/index.html#a-note-about-hadoop-versions):
+
+ HADOOP_VERSION=1.0.4 sbt/sbt assembly
+
+The table below lists the corresponding HADOOP_VERSION for each CDH/HDP release. Note that
+some Hadoop releases are binary compatible across client versions. This means the pre-built Spark
+distribution may "just work" without you needing to compile. That said, we recommend compiling with
+the _exact_ Hadoop version you are running to avoid any compatibility errors.
+
+<table>
+ <tr valign="top">
+ <td>
+ <h3>CDH Releases</h3>
+ <table class="table" style="width:350px;">
+ <tr><th>Version</th><th>HADOOP_VERSION</th></tr>
+ <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr>
+ <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr>
+ <tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
+ <tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
+ <tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
+ </table>
+ </td>
+ <td>
+ <h3>HDP Releases</h3>
+ <table class="table" style="width:350px;">
+ <tr><th>Version</th><th>HADOOP_VERSION</th></tr>
+ <tr><td>HDP 1.3</td><td>1.2.0</td></tr>
+ <tr><td>HDP 1.2</td><td>1.1.2</td></tr>
+ <tr><td>HDP 1.1</td><td>1.0.3</td></tr>
+ <tr><td>HDP 1.0</td><td>1.0.3</td></tr>
+ </table>
+ </td>
+ </tr>
+</table>
+
+# Where to Run Spark
+As described in the [Hardware Provisioning](hardware-provisioning.html#storage-systems) guide,
+Spark can run in a variety of deployment modes:
+
+* Using dedicated set of Spark nodes in your cluster. These nodes should be co-located with your
+ Hadoop installation.
+* Running on the same nodes as an existing Hadoop installation, with a fixed amount memory and
+ cores dedicated to Spark on each node.
+* Run Spark alongside Hadoop using a cluster resource manager, such as YARN or Mesos.
+
+These options are identical for those using CDH and HDP.
+
+# Inheriting Cluster Configuration
+If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that
+should be included on Spark's classpath:
+
+* `hdfs-site.xml`, which provides default behaviors for the HDFS client.
+* `core-site.xml`, which sets the default filesystem name.
+
+The location of these configuration files varies across CDH and HDP versions, but
+a common location is inside of `/etc/hadoop/conf`. Some tools, such as Cloudera Manager, create
+configurations on-the-fly, but offer a mechanisms to download copies of them.
+
+There are a few ways to make these files visible to Spark:
+
+* You can copy these files into `$SPARK_HOME/conf` and they will be included in Spark's
+classpath automatically.
+* If you are running Spark on the same nodes as Hadoop _and_ your distribution includes both
+`hdfs-site.xml` and `core-site.xml` in the same directory, you can set `HADOOP_CONF_DIR`
+in `$SPARK_HOME/spark-env.sh` to that directory.
diff --git a/docs/index.md b/docs/index.md
index 7d73929940..d3aacc629f 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -46,6 +46,11 @@ Spark supports several options for deployment:
* [Apache Mesos](running-on-mesos.html)
* [Hadoop YARN](running-on-yarn.html)
+There is a script, `./make-distribution.sh`, which will create a binary distribution of Spark for deployment
+to any machine with only the Java runtime as a necessary dependency.
+Running the script creates a distribution directory in `dist/`, or the `-tgz` option to create a .tgz file.
+Check the script for additional options.
+
# A Note About Hadoop Versions
Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported
diff --git a/docs/monitoring.md b/docs/monitoring.md
new file mode 100644
index 0000000000..4c4f174503
--- /dev/null
+++ b/docs/monitoring.md
@@ -0,0 +1,58 @@
+---
+layout: global
+title: Monitoring and Instrumentation
+---
+
+There are several ways to monitor the progress of Spark jobs.
+
+# Web Interfaces
+When a SparkContext is initialized, it launches a web server (by default at port 3030) which
+displays useful information. This includes a list of active and completed scheduler stages,
+a summary of RDD blocks and partitions, and environmental information. If multiple SparkContexts
+are running on the same host, they will bind to succesive ports beginning with 3030 (3031, 3032,
+etc).
+
+Spark's Standlone Mode scheduler also has its own
+[web interface](spark-standalone.html#monitoring-and-logging).
+
+# Spark Metrics
+Spark has a configurable metrics system based on the
+[Coda Hale Metrics Library](http://metrics.codahale.com/).
+This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV
+files. The metrics system is configured via a configuration file that Spark expects to be present
+at `$SPARK_HOME/conf/metrics.conf`. A custom file location can be specified via the
+`spark.metrics.conf` Java system property. Spark's metrics are decoupled into different
+_instances_ corresponding to Spark components. Within each instance, you can configure a
+set of sinks to which metrics are reported. The following instances are currently supported:
+
+* `master`: The Spark standalone master process.
+* `applications`: A component within the master which reports on various applications.
+* `worker`: A Spark standalone worker process.
+* `executor`: A Spark executor.
+* `driver`: The Spark driver process (the process in which your SparkContext is created).
+
+Each instance can report to zero or more _sinks_. Sinks are contained in the
+`org.apache.spark.metrics.sink` package:
+
+* `ConsoleSink`: Logs metrics information to the console.
+* `CSVSink`: Exports metrics data to CSV files at regular intervals.
+* `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
+* `JmxSink`: Registers metrics for viewing in a JXM console.
+* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
+
+The syntax of the metrics configuration file is defined in an example configuration file,
+`$SPARK_HOME/conf/metrics.conf.template`.
+
+# Advanced Instrumentation
+Several external tools can be used to help profile the performance of Spark jobs:
+
+* Cluster-wide monitoring tools, such as [Ganglia](http://ganglia.sourceforge.net/), can provide
+insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia
+dashboard can quickly reveal whether a particular workload is disk bound, network bound, or
+CPU bound.
+* OS profiling tools such as [dstat](http://dag.wieers.com/home-made/dstat/),
+[iostat](http://linux.die.net/man/1/iostat), and [iotop](http://linux.die.net/man/1/iotop)
+can provide fine-grained profiling on individual nodes.
+* JVM utilities such as `jstack` for providing stack traces, `jmap` for creating heap-dumps,
+`jstat` for reporting time-series statistics and `jconsole` for visually exploring various JVM
+properties are useful for those comfortable with JVM internals.
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 93421efcbc..c611db0af4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -42,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
- SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
+ SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@@ -54,14 +54,27 @@ The command to launch the YARN Client is as follows:
For example:
- SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \
- --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
- --class org.apache.spark.examples.SparkPi \
- --args yarn-standalone \
- --num-workers 3 \
- --master-memory 4g \
- --worker-memory 2g \
- --worker-cores 1
+ # Build the Spark assembly JAR and the Spark examples JAR
+ $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt assembly
+
+ # Configure logging
+ $ cp conf/log4j.properties.template conf/log4j.properties
+
+ # Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
+ $ SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
+ ./spark-class org.apache.spark.deploy.yarn.Client \
+ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
+ --class org.apache.spark.examples.SparkPi \
+ --args yarn-standalone \
+ --num-workers 3 \
+ --master-memory 4g \
+ --worker-memory 2g \
+ --worker-cores 1
+
+ # Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
+ # (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
+ $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
+ Pi is roughly 3.13794
The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 994a96f2c9..69e1291580 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -3,13 +3,21 @@ layout: global
title: Spark Standalone Mode
---
-In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [deploy scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing.
+In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [launch scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing.
+
+# Installing Spark Standalone to a Cluster
+
+The easiest way to deploy Spark is by running the `./make-distribution.sh` script to create a binary distribution.
+This distribution can be deployed to any machine with the Java runtime installed; there is no need to install Scala.
+
+The recommended procedure is to deploy and start the master on one node first, get the master spark URL,
+then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all the other nodes.
# Starting a Cluster Manually
You can start a standalone master server by executing:
- ./spark-class org.apache.spark.deploy.master.Master
+ ./bin/start-master.sh
Once started, the master will print out a `spark://HOST:PORT` URL for itself, which you can use to connect workers to it,
or pass as the "master" argument to `SparkContext`. You can also find this URL on
@@ -22,7 +30,7 @@ Similarly, you can start one or more workers and connect them to the master via:
Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
-Finally, the following configuration options can be passed to the master and worker:
+Finally, the following configuration options can be passed to the master and worker:
<table class="table">
<tr><th style="width:21%">Argument</th><th>Meaning</th></tr>
@@ -55,7 +63,7 @@ Finally, the following configuration options can be passed to the master and wor
# Cluster Launch Scripts
-To launch a Spark standalone cluster with the deploy scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
+To launch a Spark standalone cluster with the launch scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`:
@@ -134,6 +142,10 @@ To run an interactive Spark shell against the cluster, run the following command
MASTER=spark://IP:PORT ./spark-shell
+Note that if you are running spark-shell from one of the spark cluster machines, the `spark-shell` script will
+automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` variables in `conf/spark-env.sh`.
+
+You can also pass an option `-c <numCores>` to control the number of cores that spark-shell uses on the cluster.
# Job Scheduling
diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
index 675429c57e..42e8faa26e 100644
--- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
+++ b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
@@ -1,5 +1,22 @@
#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
# These variables are automatically filled in by the spark-ec2 script.
export MASTERS="{{master_list}}"
export SLAVES="{{slave_list}}"
diff --git a/examples/pom.xml b/examples/pom.xml
index 224cf6c96c..e48f5b50ab 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -127,20 +127,6 @@
</dependency>
</dependencies>
- <profiles>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
diff --git a/pom.xml b/pom.xml
index c561b099ab..5c1f9f0324 100644
--- a/pom.xml
+++ b/pom.xml
@@ -368,6 +368,99 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
@@ -620,131 +713,6 @@
<dependencyManagement>
<dependencies>
- <!-- TODO: check versions, bringover from yarn branch ! -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
</dependencyManagement>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d038a4f479..a60b553b5a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -33,15 +33,19 @@ object SparkBuild extends Build {
// HBase version; set as appropriate.
val HBASE_VERSION = "0.94.6"
+ // Target JVM version
+ val SCALAC_JVM_VERSION = "jvm-1.5"
+ val JAVAC_JVM_VERSION = "1.5"
+
lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*)
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings)
- .dependsOn(core, bagel, mllib) dependsOn(maybeYarn: _*)
+ .dependsOn(core, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming) dependsOn(maybeYarn: _*)
+ .dependsOn(core, mllib, bagel, streaming)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
@@ -77,7 +81,9 @@ object SparkBuild extends Build {
organization := "org.apache.spark",
version := "0.8.0-SNAPSHOT",
scalaVersion := "2.9.3",
- scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
+ scalacOptions := Seq("-unchecked", "-optimize", "-deprecation",
+ "-target:" + SCALAC_JVM_VERSION),
+ javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@@ -207,6 +213,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
+ "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 999611982a..6a66bd1d06 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -1,7 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import sbt._
object SparkPluginDef extends Build {
lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
/* This is not published in a Maven repository, so we get it from GitHub directly */
lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
-} \ No newline at end of file
+}
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index fd5972d381..1f35f6f939 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -30,6 +30,8 @@ Public classes:
An "add-only" shared variable that tasks can only add values to.
- L{SparkFiles<pyspark.files.SparkFiles>}
Access files shipped with jobs.
+ - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
+ Finer-grained cache persistence levels.
"""
import sys
import os
@@ -39,6 +41,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg
from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
+from pyspark.storagelevel import StorageLevel
-__all__ = ["SparkContext", "RDD", "SparkFiles"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 8fbf296509..597110321a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
+from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from py4j.java_collections import ListConverter
@@ -279,6 +280,16 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
+ def _getJavaStorageLevel(self, storageLevel):
+ """
+ Returns a Java StorageLevel based on a pyspark.StorageLevel.
+ """
+ if not isinstance(storageLevel, StorageLevel):
+ raise Exception("storageLevel must be of type pyspark.StorageLevel")
+
+ newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
+ return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
+ storageLevel.deserialized, storageLevel.replication)
def _test():
import atexit
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 914118ccdd..58e1849cad 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -70,6 +70,25 @@ class RDD(object):
self._jrdd.cache()
return self
+ def persist(self, storageLevel):
+ """
+ Set this RDD's storage level to persist its values across operations after the first time
+ it is computed. This can only be used to assign a new storage level if the RDD does not
+ have a storage level set yet.
+ """
+ self.is_cached = True
+ javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
+ self._jrdd.persist(javaStorageLevel)
+ return self
+
+ def unpersist(self):
+ """
+ Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ """
+ self.is_cached = False
+ self._jrdd.unpersist()
+ return self
+
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 54823f8037..dc205b306f 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -24,6 +24,7 @@ import os
import platform
import pyspark
from pyspark.context import SparkContext
+from pyspark.storagelevel import StorageLevel
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
new file mode 100644
index 0000000000..b31f4762e6
--- /dev/null
+++ b/python/pyspark/storagelevel.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StorageLevel"]
+
+class StorageLevel:
+ """
+ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
+ whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
+ in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
+ Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
+ """
+
+ def __init__(self, useDisk, useMemory, deserialized, replication = 1):
+ self.useDisk = useDisk
+ self.useMemory = useMemory
+ self.deserialized = deserialized
+ self.replication = replication
+
+StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
+StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2)
+StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False)
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2)
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False)
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2)
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index d61b36a61a..3685561501 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -106,16 +106,6 @@
<profiles>
<profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </profile>
- <profile>
<id>deb</id>
<build>
<plugins>
diff --git a/repl/pom.xml b/repl/pom.xml
index a1c87d7618..3123b37780 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -131,16 +131,4 @@
</plugin>
</plugins>
</build>
- <profiles>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
</project>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 654b5bcd2d..27b2002095 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -30,6 +30,34 @@
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ </dependencies>
+
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
@@ -75,37 +103,4 @@
</plugin>
</plugins>
</build>
-
- <profiles>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
</project>