aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md9
-rw-r--r--core/src/main/scala/spark/CacheManager.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala76
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala6
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala8
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala6
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala87
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala16
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala13
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala67
-rw-r--r--docs/quick-start.md18
-rw-r--r--docs/tuning.md2
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala6
16 files changed, 247 insertions, 79 deletions
diff --git a/README.md b/README.md
index 1f8f7b6876..ba24ab43b1 100644
--- a/README.md
+++ b/README.md
@@ -12,12 +12,15 @@ This README file only contains basic setup instructions.
## Building
-Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT),
-which is packaged with it. To build Spark and its example programs, run:
+Spark requires Scala 2.9.2 (Scala 2.10 is not yet supported). The project is
+built using Simple Build Tool (SBT), which is packaged with it. To build
+Spark and its example programs, run:
sbt/sbt package
-Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html) in the spark documentation..
+Spark also supports building using Maven. If you would like to build using Maven,
+see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
+in the spark documentation..
To run Spark, you will need to have Scala's bin directory in your `PATH`, or
you will need to set the `SCALA_HOME` environment variable to point to where
diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala
index c7b379a3fb..f7a2b7e802 100644
--- a/core/src/main/scala/spark/CacheManager.scala
+++ b/core/src/main/scala/spark/CacheManager.scala
@@ -27,7 +27,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
if (loading.contains(key)) {
logInfo("Loading contains " + key + ", waiting...")
while (loading.contains(key)) {
- try {loading.wait()} catch {case _ =>}
+ try {loading.wait()} catch {case _ : Throwable =>}
}
logInfo("Loading no longer contains " + key + ", so returning cached result")
// See whether someone else has successfully loaded it. The main way this would fail
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 9bd8a0f98d..33dc7627a3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
@@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
+ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+ if (shuffle) {
+ // include a shuffle step so that our upstream tasks are still distributed
+ new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ } else {
+ new CoalescedRDD(this, numPartitions)
+ }
+ }
/**
* Return a sampled subset of this RDD.
@@ -365,6 +373,62 @@ abstract class RDD[T: ClassManifest](
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
/**
+ * Maps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => U): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.map(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * FlatMaps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => Seq[U]): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.flatMap(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+ * This additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def foreachWith[A: ClassManifest](constructA: Int => A)
+ (f:(T, A) => Unit) {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.map(t => {f(t, a); t})
+ }
+ (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+ }
+
+ /**
+ * Filters this RDD with p, where p takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def filterWith[A: ClassManifest](constructA: Int => A)
+ (p:(T, A) => Boolean): RDD[T] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.filter(t => p(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+ }
+
+ /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
@@ -383,6 +447,14 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Applies a function f to each partition of this RDD.
+ */
+ def foreachPartition(f: Iterator[T] => Unit) {
+ val cleanF = sc.clean(f)
+ sc.runJob(this, (iter: Iterator[T]) => f(iter))
+ }
+
+ /**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
@@ -404,7 +476,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index ba00b6a844..16692c0440 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
+ fromRDD(srdd.coalesce(numPartitions, shuffle))
+
+ /**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 49aaabf835..30084df4e2 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
+ def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
+ fromRDD(rdd.coalesce(numPartitions, shuffle))
/**
* Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3016888898..e29f1e5899 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] {
def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
+ rdd.coalesce(numPartitions, shuffle)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 4474ef4593..3e7407b58d 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -16,66 +16,61 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
-private[spark] class Executor extends Logging {
- var urlClassLoader : ExecutorURLClassLoader = null
- var threadPool: ExecutorService = null
- var env: SparkEnv = null
-
+private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
+
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
- val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
- val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+ private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
- val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
+ private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
initLogging()
- def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
- // Make sure the local hostname we report matches the cluster scheduler's name for this host
- Utils.setCustomHostname(slaveHostname)
+ // Make sure the local hostname we report matches the cluster scheduler's name for this host
+ Utils.setCustomHostname(slaveHostname)
- // Set spark.* system properties from executor arg
- for ((key, value) <- properties) {
- System.setProperty(key, value)
- }
+ // Set spark.* system properties from executor arg
+ for ((key, value) <- properties) {
+ System.setProperty(key, value)
+ }
+
+ // Create our ClassLoader and set it on this thread
+ private val urlClassLoader = createClassLoader()
+ Thread.currentThread.setContextClassLoader(urlClassLoader)
- // Create our ClassLoader and set it on this thread
- urlClassLoader = createClassLoader()
- Thread.currentThread.setContextClassLoader(urlClassLoader)
-
- // Make any thread terminations due to uncaught exceptions kill the entire
- // executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler {
- override def uncaughtException(thread: Thread, exception: Throwable) {
- try {
- logError("Uncaught exception in thread " + thread, exception)
-
- // We may have been called from a shutdown hook. If so, we must not call System.exit().
- // (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
- }
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
- } catch {
- case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
- case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
- )
+ }
+ )
- // Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
- SparkEnv.set(env)
+ // Initialize Spark environment (using system properties read above)
+ val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+ SparkEnv.set(env)
- // Start worker thread pool
- threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
- }
+ // Start worker thread pool
+ val threadPool = new ThreadPoolExecutor(
+ 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 818d6d1dda..10f3531df0 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-private[spark] class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend
extends MesosExecutor
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ExecutorDriver = null
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(
+ executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
- properties
- )
+ properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ } else {
+ executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+ }
}
override def error(d: ExecutorDriver, message: String) {
@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
- val runner = new MesosExecutorBackend(new Executor)
+ val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 9a82c3054c..1047f71c6a 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
- executor: Executor,
driverUrl: String,
executorId: String,
hostname: String,
@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
+ var executor: Executor = null
var driver: ActorRef = null
override def preStart() {
@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
- executor.initialize(executorId, hostname, sparkProperties)
+ executor = new Executor(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ System.exit(1)
+ } else {
+ executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ }
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 0ff4a47998..7fbdd44340 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
+
+ // we can optionally shuffle to keep the upstream parallel
+ val coalesced5 = data.coalesce(1, shuffle = true)
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ null)
}
test("zipped RDDs") {
@@ -208,4 +213,64 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(prunedData.size === 1)
assert(prunedData(0) === 10)
}
+
+ test("mapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.mapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(2) === prn42_3)
+ assert(randoms(5) === prn43_3)
+ }
+
+ test("flatMapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.flatMapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) =>
+ val random = prng.nextDouble()
+ Seq(random * t, random * t * 10)}.
+ collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(5) === prn42_3 * 10)
+ assert(randoms(11) === prn43_3 * 10)
+ }
+
+ test("filterWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
+ val sample = ints.filterWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
+ collect()
+ val checkSample = {
+ val prng42 = new Random(42)
+ val prng43 = new Random(43)
+ Array(1, 2, 3, 4, 5, 6).filter{i =>
+ if (i < 4) 0 == prng42.nextInt(3)
+ else 0 == prng43.nextInt(3)}
+ }
+ assert(sample.size === checkSample.size)
+ for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
+ }
}
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 216f7c9cc5..5c80d2ed3a 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -111,14 +111,16 @@ We'll create a very simple Spark job in Scala. So simple, in fact, that it's nam
import spark.SparkContext
import SparkContext._
-object SimpleJob extends Application {
- val logFile = "/var/log/syslog" // Should be some file on your system
- val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
- List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
- val logData = sc.textFile(logFile, 2).cache()
- val numAs = logData.filter(line => line.contains("a")).count()
- val numBs = logData.filter(line => line.contains("b")).count()
- println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
+object SimpleJob {
+ def main(args: Array[String]) {
+ val logFile = "/var/log/syslog" // Should be some file on your system
+ val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
+ List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
+ val logData = sc.textFile(logFile, 2).cache()
+ val numAs = logData.filter(line => line.contains("a")).count()
+ val numBs = logData.filter(line => line.contains("b")).count()
+ println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
+ }
}
{% endhighlight %}
diff --git a/docs/tuning.md b/docs/tuning.md
index 843380b9a2..32c7ab86e9 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -49,7 +49,7 @@ Finally, to register your classes with Kryo, create a public class that extends
{% highlight scala %}
import com.esotericsoftware.kryo.Kryo
-class MyRegistrator extends KryoRegistrator {
+class MyRegistrator extends spark.KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d44bf3b5e3..5f378b2398 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -37,7 +37,7 @@ object SparkBuild extends Build {
organization := "org.spark-project",
version := "0.7.1-SNAPSHOT",
scalaVersion := "2.9.2",
- scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
+ scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index 6b310bc0b6..da224ad6f7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -28,7 +28,7 @@ class QueueInputDStream[T: ClassManifest](
}
if (buffer.size > 0) {
if (oneAtATime) {
- Some(buffer.first)
+ Some(buffer.head)
} else {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
}
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index 8fce91853c..cf2ed8b1d4 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,10 +6,12 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
override def framework() = "BasicOperationsSuite"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 4d33857b25..67dca2ac31 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -30,12 +30,14 @@ import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
val testPort = 9999
override def checkpointDir = "checkpoint"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")