aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala4
-rw-r--r--docs/building-spark.md16
-rw-r--r--docs/configuration.md6
-rw-r--r--docs/tuning.md8
-rw-r--r--pom.xml2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala2
12 files changed, 65 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 7d96962c4a..e45885338e 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -136,14 +136,12 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
- if (fetching.contains(shuffleId)) {
- // Someone else is fetching it; wait for them to be done
- while (fetching.contains(shuffleId)) {
- try {
- fetching.wait()
- } catch {
- case e: InterruptedException =>
- }
+ // Someone else is fetching it; wait for them to be done
+ while (fetching.contains(shuffleId)) {
+ try {
+ fetching.wait()
+ } catch {
+ case e: InterruptedException =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index dbff9d12b5..49dae5231a 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -93,19 +93,19 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
- * the connection is not supporting integrity or privacy protection (encryption)
+ * the connection does not support integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
- * SPARK could be support in the future to allow the user to specify the quality
+ * SPARK could support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
- * and a Server, so for a particular connection is has to determine what to do.
+ * and a Server, so for a particular connection it has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
- * The ConnectionManager tracks all the sendingConnections using the ConnectionId
- * and waits for the response from the server and does the handshake before sending
+ * The ConnectionManager tracks all the sendingConnections using the ConnectionId,
+ * waits for the response from the server, and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
@@ -114,14 +114,14 @@ import org.apache.spark.network.sasl.SecretKeyHolder
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
- * properly. For non-Yarn deployments, users can write a filter to go through a
- * companies normal login service. If an authentication filter is in place then the
+ * properly. For non-Yarn deployments, users can write a filter to go through their
+ * organization's normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
*
- * The exact mechanisms used to generate/distributed the shared secret is deployment specific.
+ * The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
*
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
@@ -138,7 +138,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
* This again is not ideal as one user could potentially affect another users application.
* This should be enhanced in the future to provide better protection.
- * If the UI needs to be secured the user needs to install a javax servlet filter to do the
+ * If the UI needs to be secure, the user needs to install a javax servlet filter to do the
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index e464b32e61..f4215f268a 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,7 +156,15 @@ object SparkEnv extends Logging {
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
- create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus)
+ create(
+ conf,
+ SparkContext.DRIVER_IDENTIFIER,
+ hostname,
+ port,
+ isDriver = true,
+ isLocal = isLocal,
+ listenerBus = listenerBus
+ )
}
/**
@@ -171,8 +179,16 @@ object SparkEnv extends Logging {
numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
- create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
- numUsableCores = numCores)
+ create(
+ conf,
+ executorId,
+ hostname,
+ port,
+ isDriver = false,
+ isLocal = isLocal,
+ defaultActorSystem = actorSystem,
+ numUsableCores = numCores
+ )
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index ffc0a8a6d6..70edf191d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -60,7 +60,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*
- * Note: This is an internal API. We recommend users use RDD.coGroup(...) instead of
+ * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of
* instantiating this directly.
* @param rdds parent RDDs.
@@ -70,8 +70,8 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
- // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
- // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
+ // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
+ // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 01d5943d77..1efce124c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,7 +122,7 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
- * plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap
+ * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
*
* @param loc location where the task is being executed
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 2552d03d18..d7dde4fe38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
final def run(attemptId: Long): T = {
- context = new TaskContextImpl(stageId, partitionId, attemptId, false)
+ context = new TaskContextImpl(stageId, partitionId, attemptId, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.hostname = Utils.localHostName()
taskThread = Thread.currentThread()
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 801ae54086..a44a8e1249 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -20,8 +20,8 @@ package org.apache.spark.shuffle
import org.apache.spark.{TaskContext, ShuffleDependency}
/**
- * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on both the
- * driver and executors, based on the spark.shuffle.manager setting. The driver registers shuffles
+ * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
+ * and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
*
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4922e877e9..70165eabca 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -124,7 +124,21 @@ We use the scala-maven-plugin which supports incremental and continuous compilat
mvn scala:cc
-should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
+should run continuous compilation (i.e. wait for changes). However, this has not been tested
+extensively. A couple of gotchas to note:
+* it only scans the paths `src/main` and `src/test` (see
+[docs](http://scala-tools.org/mvnsites/maven-scala-plugin/usage_cc.html)), so it will only work
+from within certain submodules that have that structure.
+* you'll typically need to run `mvn install` from the project root for compilation within
+specific submodules to work; this is because submodules that depend on other submodules do so via
+the `spark-parent` module).
+
+Thus, the full flow for running continuous-compilation of the `core` submodule may look more like:
+ ```
+ $ mvn install
+ $ cd core
+ $ mvn scala:cc
+```
# Using With IntelliJ IDEA
diff --git a/docs/configuration.md b/docs/configuration.md
index acee267883..64aa94f622 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -75,8 +75,8 @@ in the `spark-defaults.conf` file.
The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly. Note
-that only values explicitly specified through either `spark-defaults.conf` or SparkConf will
-appear. For all other configuration properties, you can assume the default value is used.
+that only values explicitly specified through `spark-defaults.conf`, `SparkConf`, or the command
+line will appear. For all other configuration properties, you can assume the default value is used.
## Available Properties
@@ -310,7 +310,7 @@ Apart from these, the following properties are also available, and may be useful
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to the Executor
- process. The user can specify multiple of these and to set multiple environment variables.
+ process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
diff --git a/docs/tuning.md b/docs/tuning.md
index c4ca766328..e2fdcfe6a3 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -111,7 +111,7 @@ pointer-based data structures and wrapper objects. There are several ways to do
3. Consider using numeric IDs or enumeration objects instead of strings for keys.
4. If you have less than 32 GB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be
four bytes instead of eight. You can add these options in
- [`spark-env.sh`](configuration.html#environment-variables-in-spark-envsh).
+ [`spark-env.sh`](configuration.html#environment-variables).
## Serialized RDD Storage
@@ -154,7 +154,7 @@ By default, Spark uses 60% of the configured executor memory (`spark.executor.me
cache RDDs. This means that 40% of memory is available for any objects created during task execution.
In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
-memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call
+memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call
`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. Combined with the use of serialized caching,
using a smaller cache should be sufficient to mitigate most of the garbage collection problems.
In case you are interested in further tuning the Java GC, continue reading below.
@@ -190,7 +190,7 @@ temporary objects created during task execution. Some steps which may be useful
* As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
- size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB,
+ size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 64 MB,
we can estimate size of Eden to be `4*3*64MB`.
* Monitor how the frequency and time taken by garbage collection changes with the new settings.
@@ -219,7 +219,7 @@ working set of one of your tasks, such as one of the reduce tasks in `groupByKey
Spark's shuffle operations (`sortByKey`, `groupByKey`, `reduceByKey`, `join`, etc) build a hash table
within each task to perform the grouping, which can often be large. The simplest fix here is to
*increase the level of parallelism*, so that each task's input set is smaller. Spark can efficiently
-support tasks as short as 200 ms, because it reuses one worker JVMs across all tasks and it has
+support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has
a low task launching cost, so you can safely increase the level of parallelism to more than the
number of cores in your clusters.
diff --git a/pom.xml b/pom.xml
index f42257265e..cdc2969edd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -267,7 +267,7 @@
<version>1.0.0</version>
</dependency>
<!--
- This depndency has been added to provided scope as it is needed for excuting build
+ This depndency has been added to provided scope as it is needed for executing build
specific groovy scripts using gmaven+ and not required for downstream project building
with spark.
-->
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index 1868a1ebc7..a7d63bd4f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -123,7 +123,7 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* As Actors can also be used to receive data from almost any stream source.
* A nice set of abstraction(s) for actors as receivers is already provided for
* a few general cases. It is thus exposed as an API where user may come with
- * his own Actor to run as receiver for Spark Streaming input source.
+ * their own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
* [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].