aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/RDD.scala12
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala22
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala6
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala8
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala5
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala7
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/building-with-maven.md4
-rw-r--r--docs/index.md2
-rw-r--r--docs/quick-start.md18
-rw-r--r--docs/tuning.md2
-rw-r--r--examples/pom.xml2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala2
-rw-r--r--project/SparkBuild.scala6
-rw-r--r--python/examples/transitive_closure.py2
-rw-r--r--python/pyspark/rdd.py2
-rwxr-xr-xrun2
-rw-r--r--run2.cmd2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala6
22 files changed, 82 insertions, 44 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index ed39732f13..ccd9d0364a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
@@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
+ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+ if (shuffle) {
+ // include a shuffle step so that our upstream tasks are still distributed
+ new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ } else {
+ new CoalescedRDD(this, numPartitions)
+ }
+ }
/**
* Return a sampled subset of this RDD.
@@ -358,7 +366,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- @deprecated("use mapPartitionsWithIndex")
+ @deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassManifest](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6b4a11d6d3..518034e07b 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self: RDD[(K, V)])
extends Logging
with Serializable {
-
+
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
- // is of the form "java.lang.Object apply(java.lang.Object)"
+ // is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
- m => m.getReturnType().toString != "java.lang.Object" &&
+ m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType
}
@@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
- }
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ }
}
}
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index ba00b6a844..16692c0440 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
+ fromRDD(srdd.coalesce(numPartitions, shuffle))
+
+ /**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 49aaabf835..30084df4e2 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
+ def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
+ fromRDD(rdd.coalesce(numPartitions, shuffle))
/**
* Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3016888898..e29f1e5899 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] {
def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
+ rdd.coalesce(numPartitions, shuffle)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index da3f4f636c..8919d1261c 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -81,7 +81,7 @@ private[spark] class Worker(
}
def startWebUi() {
- val webUi = new WorkerWebUI(context.system, self)
+ val webUi = new WorkerWebUI(context.system, self, workDir)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index 135cc2e86c..c834f87d50 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -12,12 +12,13 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
+import java.io.File
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
+class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
@@ -43,7 +44,7 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
path("log") {
parameters("appId", "executorId", "logType") { (appId, executorId, logType) =>
respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
- getFromFileName("work/" + appId + "/" + executorId + "/" + logType)
+ getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType)
}
}
} ~
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 53635b1de6..7fbdd44340 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
+
+ // we can optionally shuffle to keep the upstream parallel
+ val coalesced5 = data.coalesce(1, shuffle = true)
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ null)
}
test("zipped RDDs") {
diff --git a/docs/_config.yml b/docs/_config.yml
index f99d5bb376..055ba77c5b 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -5,6 +5,6 @@ markdown: kramdown
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.7.1-SNAPSHOT
SPARK_VERSION_SHORT: 0.7.1
-SCALA_VERSION: 2.9.2
+SCALA_VERSION: 2.9.3
MESOS_VERSION: 0.9.0-incubating
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index c2eeafd07a..04cd79d039 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -42,10 +42,10 @@ To run a specific test suite:
You might run into the following errors if you're using a vanilla installation of Maven:
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
[ERROR] PermGen space -> [Help 1]
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+ [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
[ERROR] Java heap space -> [Help 1]
To fix these, you can do the following:
diff --git a/docs/index.md b/docs/index.md
index 51d505e1fa..0c4add45dc 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point
to where you've installed Scala. Scala must also be accessible through one
of these methods on slave nodes on your cluster.
-Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run
+Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
sbt/sbt package
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 216f7c9cc5..5c80d2ed3a 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -111,14 +111,16 @@ We'll create a very simple Spark job in Scala. So simple, in fact, that it's nam
import spark.SparkContext
import SparkContext._
-object SimpleJob extends Application {
- val logFile = "/var/log/syslog" // Should be some file on your system
- val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
- List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
- val logData = sc.textFile(logFile, 2).cache()
- val numAs = logData.filter(line => line.contains("a")).count()
- val numBs = logData.filter(line => line.contains("b")).count()
- println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
+object SimpleJob {
+ def main(args: Array[String]) {
+ val logFile = "/var/log/syslog" // Should be some file on your system
+ val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
+ List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
+ val logData = sc.textFile(logFile, 2).cache()
+ val numAs = logData.filter(line => line.contains("a")).count()
+ val numBs = logData.filter(line => line.contains("b")).count()
+ println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
+ }
}
{% endhighlight %}
diff --git a/docs/tuning.md b/docs/tuning.md
index 843380b9a2..32c7ab86e9 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -49,7 +49,7 @@ Finally, to register your classes with Kryo, create a public class that extends
{% highlight scala %}
import com.esotericsoftware.kryo.Kryo
-class MyRegistrator extends KryoRegistrator {
+class MyRegistrator extends spark.KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
diff --git a/examples/pom.xml b/examples/pom.xml
index 39cc47c709..0537404040 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
- <version>0.1.8</version>
+ <version>0.1.11</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 483aae452b..a9642100e3 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -49,7 +49,7 @@ object TwitterAlgebirdCMS {
val users = stream.map(status => status.getUser.getId)
- val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ea146b7b0b..08a6c1866e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -36,7 +36,7 @@ object SparkBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.7.1-SNAPSHOT",
- scalaVersion := "2.9.2",
+ scalaVersion := "2.9.3",
scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
@@ -140,7 +140,7 @@ object SparkBuild extends Build {
"colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1",
- "cc.spray" %% "spray-json" % "1.1.1",
+ "cc.spray" % "spray-json_2.9.2" % "1.1.1",
"org.apache.mesos" % "mesos" % "0.9.0-incubating"
) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
@@ -157,7 +157,7 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
- libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8")
+ libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11")
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 73f7f8fbaf..7f85a1008e 100644
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -24,7 +24,7 @@ if __name__ == "__main__":
"Usage: PythonTC <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonTC")
- slices = sys.argv[2] if len(sys.argv) > 2 else 2
+ slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
tc = sc.parallelize(generateGraph(), slices).cache()
# Linear transitive closure: each round grows paths by one edge,
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 172ed85fab..a9fec17a9d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -399,7 +399,7 @@ class RDD(object):
>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
>>> from fileinput import input
>>> from glob import glob
- >>> ''.join(input(glob(tempFile.name + "/part-0000*")))
+ >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
"""
def func(split, iterator):
diff --git a/run b/run
index 2c29cc4a66..ca25003708 100755
--- a/run
+++ b/run
@@ -1,6 +1,6 @@
#!/bin/bash
-SCALA_VERSION=2.9.2
+SCALA_VERSION=2.9.3
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
diff --git a/run2.cmd b/run2.cmd
index cb20a4b7a2..e8972690b8 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
@echo off
-set SCALA_VERSION=2.9.2
+set SCALA_VERSION=2.9.3
rem Figure out where the Spark framework is installed
set FWDIR=%~dp0
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index 8fce91853c..cf2ed8b1d4 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,10 +6,12 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
override def framework() = "BasicOperationsSuite"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 4d33857b25..67dca2ac31 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -30,12 +30,14 @@ import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
val testPort = 9999
override def checkpointDir = "checkpoint"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")