aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 18:47:12 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 18:47:12 -0700
commitc53670b9bf709ab583cbc75e952026bc4abb6c5f (patch)
tree0d1248182923bec89560a87ae0a07ef680724fdb
parent697b0bee2c6d7e7b6b02c627c84975066fc67b91 (diff)
downloadspark-c53670b9bf709ab583cbc75e952026bc4abb6c5f.tar.gz
spark-c53670b9bf709ab583cbc75e952026bc4abb6c5f.tar.bz2
spark-c53670b9bf709ab583cbc75e952026bc4abb6c5f.zip
Various code style fixes, mostly from IntelliJ IDEA
-rw-r--r--core/src/main/scala/spark/Accumulators.scala32
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/spark/CartesianRDD.scala4
-rw-r--r--core/src/main/scala/spark/ClosureCleaner.scala10
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/spark/DaemonThreadFactory.scala2
-rw-r--r--core/src/main/scala/spark/Executor.scala2
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala12
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala2
-rw-r--r--core/src/main/scala/spark/Logging.scala37
-rw-r--r--core/src/main/scala/spark/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala6
-rw-r--r--core/src/main/scala/spark/SampledRDD.scala6
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/Split.scala2
-rw-r--r--core/src/main/scala/spark/UnionRDD.scala2
-rw-r--r--core/src/main/scala/spark/Utils.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala69
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala48
-rw-r--r--core/src/main/scala/spark/broadcast/ChainedBroadcast.scala55
-rw-r--r--core/src/main/scala/spark/broadcast/DfsBroadcast.scala16
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala8
-rw-r--r--core/src/main/scala/spark/broadcast/SourceInfo.scala10
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala62
-rw-r--r--core/src/main/scala/spark/network/Connection.scala2
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala10
-rw-r--r--core/src/main/scala/spark/partial/ApproximateActionListener.scala26
-rw-r--r--core/src/main/scala/spark/partial/PartialResult.scala54
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobWaiter.scala36
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala10
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala2
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala8
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala12
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala22
-rw-r--r--project/plugins.sbt2
41 files changed, 328 insertions, 287 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index 86e2061b9f..a2003d8049 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -26,7 +26,7 @@ class Accumulator[T] (
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
- in.defaultReadObject
+ in.defaultReadObject()
value_ = zero
deserialized = true
Accumulators.register(this, false)
@@ -53,18 +53,22 @@ private object Accumulators {
return lastId
}
- def register(a: Accumulator[_], original: Boolean): Unit = synchronized {
- if (original) {
- originals(a.id) = a
- } else {
- val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
- accums(a.id) = a
+ def register(a: Accumulator[_], original: Boolean) {
+ synchronized {
+ if (original) {
+ originals(a.id) = a
+ } else {
+ val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
+ accums(a.id) = a
+ }
}
}
// Clear the local (non-original) accumulators for the current thread
- def clear: Unit = synchronized {
- localAccums.remove(Thread.currentThread)
+ def clear() {
+ synchronized {
+ localAccums.remove(Thread.currentThread)
+ }
}
// Get the values of the local accumulators for the current thread (by ID)
@@ -77,10 +81,12 @@ private object Accumulators {
}
// Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- originals(id).asInstanceOf[Accumulator[Any]] += value
+ def add(values: Map[Long, Any]) {
+ synchronized {
+ for ((id, value) <- values) {
+ if (originals.contains(id)) {
+ originals(id).asInstanceOf[Accumulator[Any]] += value
+ }
}
}
}
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index e00a0d80fa..010203d1ca 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -41,7 +41,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
val (blockId, blockOption) = x
blockOption match {
case Some(block) => {
- val values = block.asInstanceOf[Iterator[Any]]
+ val values = block
for(value <- values) {
val v = value.asInstanceOf[(K, V)]
func(v._1, v._2)
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala
index 38afa59b29..e26041555a 100644
--- a/core/src/main/scala/spark/CartesianRDD.scala
+++ b/core/src/main/scala/spark/CartesianRDD.scala
@@ -1,7 +1,7 @@
package spark
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
- override val index = idx
+ override val index: Int = idx
}
class CartesianRDD[T: ClassManifest, U:ClassManifest](
@@ -24,7 +24,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
array
}
- override def splits = splits_.asInstanceOf[Array[Split]]
+ override def splits = splits_
override def preferredLocations(split: Split) = {
val currSplit = split.asInstanceOf[CartesianSplit]
diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala
index 699fdc2982..3b83d23a13 100644
--- a/core/src/main/scala/spark/ClosureCleaner.scala
+++ b/core/src/main/scala/spark/ClosureCleaner.scala
@@ -76,7 +76,7 @@ object ClosureCleaner extends Logging {
}
}
- def clean(func: AnyRef): Unit = {
+ def clean(func: AnyRef) {
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
@@ -109,7 +109,7 @@ object ClosureCleaner extends Logging {
// Clone the closure objects themselves, nulling out any fields that are not
// used in the closure we're working on or any of its inner closures.
for ((cls, obj) <- outerPairs) {
- outer = instantiateClass(cls, outer, inInterpreter);
+ outer = instantiateClass(cls, outer, inInterpreter)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
@@ -139,10 +139,10 @@ object ClosureCleaner extends Logging {
return cons.newInstance(params: _*).asInstanceOf[AnyRef]
} else {
// Use reflection to instantiate object without calling constructor
- val rf = sun.reflect.ReflectionFactory.getReflectionFactory();
- val parentCtor = classOf[java.lang.Object].getDeclaredConstructor();
+ val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
+ val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
- val obj = newCtor.newInstance().asInstanceOf[AnyRef];
+ val obj = newCtor.newInstance().asInstanceOf[AnyRef]
if (outer != null) {
//logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index 3543c8afa8..6959917d14 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -11,7 +11,7 @@ case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplit
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
- override val index = idx
+ override val index: Int = idx
override def hashCode(): Int = idx
}
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
index cb30cb2ac8..003880c5e8 100644
--- a/core/src/main/scala/spark/DaemonThreadFactory.scala
+++ b/core/src/main/scala/spark/DaemonThreadFactory.scala
@@ -7,7 +7,7 @@ import java.util.concurrent.ThreadFactory
*/
private object DaemonThreadFactory extends ThreadFactory {
override def newThread(r: Runnable): Thread = {
- val t = new Thread(r);
+ val t = new Thread(r)
t.setDaemon(true)
return t
}
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 3d70cf1737..9ead0d2870 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -67,7 +67,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
class TaskRunner(info: MTaskInfo, d: ExecutorDriver)
extends Runnable {
- override def run() = {
+ override def run() {
val tid = info.getTaskId.getValue
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(classLoader)
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index 598a18fe72..f282a4023b 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -26,7 +26,7 @@ class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
- override val index = idx
+ override val index: Int = idx
}
/**
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index 790603581f..12b6a0954c 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -119,7 +119,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
- committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter]
+ committer = conf.value.getOutputCommitter
}
return committer
}
@@ -149,11 +149,11 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
}
private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString);
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
- conf.value.set("mapred.task.id", taID.value.toString);
- conf.value.setBoolean("mapred.task.is.map", true);
- conf.value.setInt("mapred.task.partition", splitID);
+ conf.value.set("mapred.job.id", jID.value.toString)
+ conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+ conf.value.set("mapred.task.id", taID.value.toString)
+ conf.value.setBoolean("mapred.task.is.map", true)
+ conf.value.setInt("mapred.task.partition", splitID)
}
}
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index c17ec995d4..d11ba5167d 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -49,7 +49,7 @@ class JavaSerializerInstance extends SerializerInstance {
}
def deserializeStream(s: InputStream): DeserializationStream = {
- new JavaDeserializationStream(s, currentThread.getContextClassLoader)
+ new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
}
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala
index 54bd57f6d3..5ff61af72c 100644
--- a/core/src/main/scala/spark/Logging.scala
+++ b/core/src/main/scala/spark/Logging.scala
@@ -17,7 +17,7 @@ trait Logging {
// Method to get or create the logger for this object
def log: Logger = {
if (log_ == null) {
- var className = this.getClass().getName()
+ var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
@@ -28,31 +28,46 @@ trait Logging {
}
// Log methods that take only a String
- def logInfo(msg: => String) = if (log.isInfoEnabled /*&& msg.contains("job finished in")*/) log.info(msg)
+ def logInfo(msg: => String) {
+ if (log.isInfoEnabled) log.info(msg)
+ }
- def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg)
+ def logDebug(msg: => String) {
+ if (log.isDebugEnabled) log.debug(msg)
+ }
- def logTrace(msg: => String) = if (log.isTraceEnabled) log.trace(msg)
+ def logTrace(msg: => String) {
+ if (log.isTraceEnabled) log.trace(msg)
+ }
- def logWarning(msg: => String) = if (log.isWarnEnabled) log.warn(msg)
+ def logWarning(msg: => String) {
+ if (log.isWarnEnabled) log.warn(msg)
+ }
- def logError(msg: => String) = if (log.isErrorEnabled) log.error(msg)
+ def logError(msg: => String) {
+ if (log.isErrorEnabled) log.error(msg)
+ }
// Log methods that take Throwables (Exceptions/Errors) too
- def logInfo(msg: => String, throwable: Throwable) =
+ def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg)
+ }
- def logDebug(msg: => String, throwable: Throwable) =
+ def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg)
+ }
- def logTrace(msg: => String, throwable: Throwable) =
+ def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg)
+ }
- def logWarning(msg: => String, throwable: Throwable) =
+ def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
+ }
- def logError(msg: => String, throwable: Throwable) =
+ def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
+ }
// Method for ensuring that logging is initialized, to avoid having multiple
// threads do it concurrently (as SLF4J initialization is not thread safe).
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala
index cd42586aa6..d024d38aa9 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/NewHadoopRDD.scala
@@ -18,7 +18,7 @@ class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit
val serializableHadoopSplit = new SerializableWritable(rawSplit)
- override def hashCode(): Int = (41 * (41 + rddId) + index).toInt
+ override def hashCode(): Int = (41 * (41 + rddId) + index)
}
class NewHadoopRDD[K, V](
@@ -69,7 +69,7 @@ class NewHadoopRDD[K, V](
finished = !reader.nextKeyValue
havePair = !finished
if (finished) {
- reader.close
+ reader.close()
}
}
!finished
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 21f68f21c2..d79007ab40 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -18,7 +18,7 @@ class ParallelCollectionSplit[T: ClassManifest](
case _ => false
}
- override val index = slice
+ override val index: Int = slice
}
class ParallelCollection[T: ClassManifest](
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 1191523ccc..1710ff58b3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -130,12 +130,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
if (num > initialCount) {
total = maxSelected
- fraction = Math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
+ fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
} else if (num < 0) {
throw(new IllegalArgumentException("Negative number of elements requested"))
} else {
- fraction = Math.min(multiplier * (num + 1) / initialCount, 1.0)
- total = num.toInt
+ fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
+ total = num
}
var samples = this.sample(withReplacement, fraction, seed).collect()
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala
index c9a9e53d18..8ef40d8d9e 100644
--- a/core/src/main/scala/spark/SampledRDD.scala
+++ b/core/src/main/scala/spark/SampledRDD.scala
@@ -3,7 +3,7 @@ package spark
import java.util.Random
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
- override val index = prev.index
+ override val index: Int = prev.index
}
class SampledRDD[T: ClassManifest](
@@ -15,7 +15,7 @@ class SampledRDD[T: ClassManifest](
@transient
val splits_ = {
- val rg = new Random(seed);
+ val rg = new Random(seed)
prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt))
}
@@ -28,7 +28,7 @@ class SampledRDD[T: ClassManifest](
override def compute(splitIn: Split) = {
val split = splitIn.asInstanceOf[SampledRDDSplit]
- val rg = new Random(split.seed);
+ val rg = new Random(split.seed)
// Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
if (withReplacement) {
val oldData = prev.iterator(split.prev).toArray
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 5434197eca..594dbd235f 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -40,7 +40,7 @@ class ShuffledRDD[K, V, C](
return new Iterator[(K, C)] {
var iter = combiners.entrySet().iterator()
- def hasNext(): Boolean = iter.hasNext()
+ def hasNext: Boolean = iter.hasNext()
def next(): (K, C) = {
val entry = iter.next()
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index fc364b5307..cba70794e7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -424,18 +424,6 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
- def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
- val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
- classManifest[T].erasure
- } else {
- implicitly[T => Writable].getClass.getMethods()(0).getReturnType
- }
- // TODO: use something like WritableConverter to avoid reflection
- }
- c.asInstanceOf[Class[ _ <: Writable]]
- }
-
def anyToWritable[U <% Writable](u: U): Writable = u
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala
index 831f7672e6..90d4b47c55 100644
--- a/core/src/main/scala/spark/Split.scala
+++ b/core/src/main/scala/spark/Split.scala
@@ -7,7 +7,7 @@ trait Split extends Serializable {
/**
* Get the split's index within its parent RDD
*/
- val index: Int
+ def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala
index 17522e2bbb..0e8164d6ab 100644
--- a/core/src/main/scala/spark/UnionRDD.scala
+++ b/core/src/main/scala/spark/UnionRDD.scala
@@ -11,7 +11,7 @@ class UnionSplit[T: ClassManifest](
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
- override val index = idx
+ override val index: Int = idx
}
class UnionRDD[T: ClassManifest](
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 89624eb370..17670e077a 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -16,7 +16,7 @@ object Utils {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
- oos.close
+ oos.close()
return bos.toByteArray
}
@@ -48,7 +48,7 @@ object Utils {
j += 1
}
if (j > i) {
- buf += s.substring(i, j);
+ buf += s.substring(i, j)
}
i = j
while (i < s.length && !isAlpha(s.charAt(i))) {
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 5a873dca3d..e009d4e7db 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -60,7 +60,7 @@ extends Broadcast[T] with Logging with Serializable {
sendBroadcast
}
- def sendBroadcast(): Unit = {
+ def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
// Store a persistent copy in HDFS
@@ -96,7 +96,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER guideMR is created
while (guidePort == -1) {
guidePortLock.synchronized {
- guidePortLock.wait
+ guidePortLock.wait()
}
}
@@ -108,7 +108,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER serveMR is created
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -127,8 +127,8 @@ extends Broadcast[T] with Logging with Serializable {
SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize))
}
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject
+ private def readObject(in: ObjectInputStream) {
+ in.defaultReadObject()
BitTorrentBroadcast.synchronized {
val cachedVal = BitTorrentBroadcast.values.get(uuid, 0)
@@ -168,7 +168,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Initialize variables in the worker node. Master sends everything as 0/null
- private def initializeWorkerVariables: Unit = {
+ private def initializeWorkerVariables() {
arrayOfBlocks = null
hasBlocksBitVector = null
numCopiesSent = null
@@ -194,7 +194,7 @@ extends Broadcast[T] with Logging with Serializable {
stopBroadcast = false
}
- private def registerBroadcast(uuid: UUID, gInfo: SourceInfo): Unit = {
+ private def registerBroadcast(uuid: UUID, gInfo: SourceInfo) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
@@ -222,7 +222,7 @@ extends Broadcast[T] with Logging with Serializable {
socket.close()
}
- private def unregisterBroadcast(uuid: UUID): Unit = {
+ private def unregisterBroadcast(uuid: UUID) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
@@ -250,14 +250,14 @@ extends Broadcast[T] with Logging with Serializable {
// Wait till hostName and listenPort are OK
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
// Wait till totalBlocks and totalBytes are OK
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
- totalBlocksLock.wait
+ totalBlocksLock.wait()
}
}
@@ -275,7 +275,7 @@ extends Broadcast[T] with Logging with Serializable {
// Add new SourceInfo to the listOfSources. Update if it exists already.
// TODO: Optimizing just by OR-ing the BitVectors was BAD for performance
- private def addToListOfSources(newSourceInfo: SourceInfo): Unit = {
+ private def addToListOfSources(newSourceInfo: SourceInfo) {
listOfSources.synchronized {
if (listOfSources.contains(newSourceInfo)) {
listOfSources = listOfSources - newSourceInfo
@@ -284,7 +284,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def addToListOfSources(newSourceInfos: ListBuffer[SourceInfo]): Unit = {
+ private def addToListOfSources(newSourceInfos: ListBuffer[SourceInfo]) {
newSourceInfos.foreach { newSourceInfo =>
addToListOfSources(newSourceInfo)
}
@@ -292,7 +292,7 @@ extends Broadcast[T] with Logging with Serializable {
class TalkToGuide(gInfo: SourceInfo)
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
// Keep exchaning information until all blocks have been received
while (hasBlocks.get < totalBlocks) {
@@ -307,7 +307,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Connect to Guide and send this worker's information
- private def talkOnce: Unit = {
+ private def talkOnce {
var clientSocketToGuide: Socket = null
var oosGuide: ObjectOutputStream = null
var oisGuide: ObjectInputStream = null
@@ -402,7 +402,7 @@ extends Broadcast[T] with Logging with Serializable {
// ServeMultipleRequests thread
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -412,7 +412,7 @@ extends Broadcast[T] with Logging with Serializable {
hasBlocksBitVector = new BitSet(totalBlocks)
numCopiesSent = new Array[Int](totalBlocks)
totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll
+ totalBlocksLock.notifyAll()
}
totalBytes = gInfo.totalBytes
blockSize = gInfo.blockSize
@@ -445,7 +445,7 @@ extends Broadcast[T] with Logging with Serializable {
// certain bit is NOT unset upon failure resulting in an infinite loop.
private var blocksInRequestBitVector = new BitSet(totalBlocks)
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxRxSlots)
while (hasBlocks.get < totalBlocks) {
@@ -613,13 +613,13 @@ extends Broadcast[T] with Logging with Serializable {
private var oosSource: ObjectOutputStream = null
private var oisSource: ObjectInputStream = null
- override def run: Unit = {
+ override def run() {
// TODO: There is a possible bug here regarding blocksInRequestBitVector
var blockToAskFor = -1
// Setup the timeout mechanism
var timeOutTask = new TimerTask {
- override def run: Unit = {
+ override def run() {
cleanUpConnections()
}
}
@@ -645,7 +645,7 @@ extends Broadcast[T] with Logging with Serializable {
addToListOfSources(newPeerToTalkTo)
// Turn the timer OFF, if the sender responds before timeout
- timeOutTimer.cancel
+ timeOutTimer.cancel()
// Send the latest SourceInfo
oosSource.writeObject(getLocalSourceInfo)
@@ -836,7 +836,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def cleanUpConnections(): Unit = {
+ private def cleanUpConnections() {
if (oisSource != null) {
oisSource.close()
}
@@ -860,7 +860,7 @@ extends Broadcast[T] with Logging with Serializable {
// Keep track of sources that have completed reception
private var setOfCompletedSources = Set[SourceInfo]()
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -869,7 +869,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
- guidePortLock.notifyAll
+ guidePortLock.notifyAll()
}
try {
@@ -920,7 +920,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def sendStopBroadcastNotifications: Unit = {
+ private def sendStopBroadcastNotifications() {
listOfSources.synchronized {
listOfSources.foreach { sourceInfo =>
@@ -972,7 +972,7 @@ extends Broadcast[T] with Logging with Serializable {
private var sourceInfo: SourceInfo = null
private var selectedSources: ListBuffer[SourceInfo] = null
- override def run: Unit = {
+ override def run() {
try {
logInfo("new GuideSingleRequest is running")
// Connecting worker is sending in its information
@@ -1060,14 +1060,14 @@ extends Broadcast[T] with Logging with Serializable {
// Server at most Broadcast.MaxTxSlots peers
var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxTxSlots)
- override def run: Unit = {
+ override def run() {
var serverSocket = new ServerSocket(0)
listenPort = serverSocket.getLocalPort
logInfo("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
- listenPortLock.notifyAll
+ listenPortLock.notifyAll()
}
try {
@@ -1111,7 +1111,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("new ServeSingleRequest is running")
- override def run: Unit = {
+ override def run() {
try {
// Send latest local SourceInfo to the receiver
// In the case of receiver timeout and connection close, this will
@@ -1178,7 +1178,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def sendBlock(blockToSend: Int): Unit = {
+ private def sendBlock(blockToSend: Int) {
try {
oos.writeObject(arrayOfBlocks(blockToSend))
oos.flush()
@@ -1195,12 +1195,13 @@ extends Broadcast[T] with Logging with Serializable {
class BitTorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) = {
+ def initialize(isMaster: Boolean) {
BitTorrentBroadcast.initialize(isMaster)
}
- def newBroadcast[T](value_ : T, isLocal: Boolean) =
+ def newBroadcast[T](value_ : T, isLocal: Boolean) = {
new BitTorrentBroadcast[T](value_, isLocal)
+ }
}
private object BitTorrentBroadcast
@@ -1217,7 +1218,7 @@ extends Logging {
private var trackMV: TrackMultipleValues = null
- def initialize(isMaster__ : Boolean): Unit = {
+ def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
@@ -1245,7 +1246,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -1267,7 +1268,7 @@ extends Logging {
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
- override def run: Unit = {
+ override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 06049749a9..4c7ab5f8ec 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -30,26 +30,28 @@ object Broadcast extends Logging with Serializable {
private var broadcastFactory: BroadcastFactory = null
// Called by SparkContext or Executor before using Broadcast
- def initialize (isMaster__ : Boolean): Unit = synchronized {
- if (!initialized) {
- val broadcastFactoryClass = System.getProperty(
- "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory")
-
- broadcastFactory =
- Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
-
- // Setup isMaster before using it
- isMaster_ = isMaster__
-
- // Set masterHostAddress to the master's IP address for the slaves to read
- if (isMaster) {
- System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress)
- }
+ def initialize (isMaster__ : Boolean) {
+ synchronized {
+ if (!initialized) {
+ val broadcastFactoryClass = System.getProperty(
+ "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory")
+
+ broadcastFactory =
+ Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
+
+ // Setup isMaster before using it
+ isMaster_ = isMaster__
- // Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isMaster)
+ // Set masterHostAddress to the master's IP address for the slaves to read
+ if (isMaster) {
+ System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress)
+ }
- initialized = true
+ // Initialize appropriate BroadcastFactory and BroadcastObject
+ broadcastFactory.initialize(isMaster)
+
+ initialized = true
+ }
}
}
@@ -185,11 +187,11 @@ object Broadcast extends Logging with Serializable {
}
}
-case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable
+case class BroadcastBlock (blockID: Int, byteArray: Array[Byte]) extends Serializable
-case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
- val totalBlocks: Int,
- val totalBytes: Int)
+case class VariableInfo (@transient arrayOfBlocks : Array[BroadcastBlock],
+ totalBlocks: Int,
+ totalBytes: Int)
extends Serializable {
@transient
@@ -200,7 +202,7 @@ class SpeedTracker extends Serializable {
// Mapping 'source' to '(totalTime, numBlocks)'
private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] ()
- def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long): Unit = {
+ def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long) {
sourceToSpeedMap.synchronized {
if (!sourceToSpeedMap.contains(srcInfo)) {
sourceToSpeedMap += (srcInfo -> (timeInMillis, 1))
diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
index 64da650142..43290c241f 100644
--- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
@@ -47,7 +47,7 @@ extends Broadcast[T] with Logging with Serializable {
sendBroadcast
}
- def sendBroadcast(): Unit = {
+ def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
// Store a persistent copy in HDFS
@@ -80,7 +80,7 @@ extends Broadcast[T] with Logging with Serializable {
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -92,14 +92,14 @@ extends Broadcast[T] with Logging with Serializable {
// Register with the Tracker
while (guidePort == -1) {
guidePortLock.synchronized {
- guidePortLock.wait
+ guidePortLock.wait()
}
}
ChainedBroadcast.registerValue(uuid, guidePort)
}
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject
+ private def readObject(in: ObjectInputStream) {
+ in.defaultReadObject()
ChainedBroadcast.synchronized {
val cachedVal = ChainedBroadcast.values.get(uuid, 0)
if (cachedVal != null) {
@@ -135,7 +135,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def initializeSlaveVariables: Unit = {
+ private def initializeSlaveVariables() {
arrayOfBlocks = null
totalBytes = -1
totalBlocks = -1
@@ -218,7 +218,7 @@ extends Broadcast[T] with Logging with Serializable {
// ServeMultipleRequests thread
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -251,7 +251,7 @@ extends Broadcast[T] with Logging with Serializable {
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll
+ totalBlocksLock.notifyAll()
}
totalBytes = sourceInfo.totalBytes
@@ -322,7 +322,7 @@ extends Broadcast[T] with Logging with Serializable {
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
+ hasBlocksLock.notifyAll()
}
}
} catch {
@@ -349,7 +349,7 @@ extends Broadcast[T] with Logging with Serializable {
// Keep track of sources that have completed reception
private var setOfCompletedSources = Set[SourceInfo]()
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -358,7 +358,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
- guidePortLock.notifyAll
+ guidePortLock.notifyAll()
}
try {
@@ -407,7 +407,7 @@ extends Broadcast[T] with Logging with Serializable {
threadPool.shutdown()
}
- private def sendStopBroadcastNotifications: Unit = {
+ private def sendStopBroadcastNotifications() {
pqOfSources.synchronized {
var pqIter = pqOfSources.iterator
while (pqIter.hasNext) {
@@ -459,7 +459,7 @@ extends Broadcast[T] with Logging with Serializable {
private var selectedSourceInfo: SourceInfo = null
private var thisWorkerInfo:SourceInfo = null
- override def run: Unit = {
+ override def run() {
try {
logInfo("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
@@ -556,7 +556,7 @@ extends Broadcast[T] with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -565,7 +565,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
- listenPortLock.notifyAll
+ listenPortLock.notifyAll()
}
try {
@@ -609,7 +609,7 @@ extends Broadcast[T] with Logging with Serializable {
private var sendFrom = 0
private var sendUntil = totalBlocks
- override def run: Unit = {
+ override def run() {
try {
logInfo("new ServeSingleRequest is running")
@@ -639,18 +639,18 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def sendObject: Unit = {
+ private def sendObject() {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
- totalBlocksLock.wait
+ totalBlocksLock.wait()
}
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
- hasBlocksLock.wait
+ hasBlocksLock.wait()
}
}
try {
@@ -670,9 +670,12 @@ extends Broadcast[T] with Logging with Serializable {
class ChainedBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) = ChainedBroadcast.initialize(isMaster)
- def newBroadcast[T](value_ : T, isLocal: Boolean) =
+ def initialize(isMaster: Boolean) {
+ ChainedBroadcast.initialize(isMaster)
+ }
+ def newBroadcast[T](value_ : T, isLocal: Boolean) = {
new ChainedBroadcast[T](value_, isLocal)
+ }
}
private object ChainedBroadcast
@@ -689,7 +692,7 @@ extends Logging {
private var trackMV: TrackMultipleValues = null
- def initialize(isMaster__ : Boolean): Unit = {
+ def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
@@ -713,14 +716,14 @@ extends Logging {
def isMaster = isMaster_
- def registerValue(uuid: UUID, guidePort: Int): Unit = {
+ def registerValue(uuid: UUID, guidePort: Int) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap +=(uuid -> guidePort)
logInfo("New value registered with the Tracker " + valueToGuidePortMap)
}
}
- def unregisterValue(uuid: UUID): Unit = {
+ def unregisterValue(uuid: UUID) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS
logInfo("Value unregistered from the Tracker " + valueToGuidePortMap)
@@ -729,7 +732,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -751,7 +754,7 @@ extends Logging {
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
- override def run: Unit = {
+ override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
index b053e2b62e..d18dfb8963 100644
--- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
@@ -24,15 +24,15 @@ extends Broadcast[T] with Logging with Serializable {
sendBroadcast
}
- def sendBroadcast (): Unit = {
+ def sendBroadcast () {
val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
out.writeObject (value_)
- out.close
+ out.close()
}
// Called by JVM when deserializing an object
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject
+ private def readObject(in: ObjectInputStream) {
+ in.defaultReadObject()
DfsBroadcast.synchronized {
val cachedVal = DfsBroadcast.values.get(uuid, 0)
if (cachedVal != null) {
@@ -44,7 +44,7 @@ extends Broadcast[T] with Logging with Serializable {
val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T]
DfsBroadcast.values.put(uuid, 0, value_)
- fileIn.close
+ fileIn.close()
val time = (System.nanoTime - start) / 1e9
logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
@@ -55,7 +55,9 @@ extends Broadcast[T] with Logging with Serializable {
class DfsBroadcastFactory
extends BroadcastFactory {
- def initialize (isMaster: Boolean) = DfsBroadcast.initialize
+ def initialize (isMaster: Boolean) {
+ DfsBroadcast.initialize
+ }
def newBroadcast[T] (value_ : T, isLocal: Boolean) =
new DfsBroadcast[T] (value_, isLocal)
}
@@ -71,7 +73,7 @@ extends Logging {
private var compress: Boolean = false
private var bufferSize: Int = 65536
- def initialize (): Unit = {
+ def initialize() {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index d0853eadf9..6e3dde76bd 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -25,7 +25,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Called by JVM when deserializing an object
- private def readObject(in: ObjectInputStream): Unit = {
+ private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
HttpBroadcast.synchronized {
val cachedVal = HttpBroadcast.values.get(uuid, 0)
@@ -44,7 +44,9 @@ extends Broadcast[T] with Logging with Serializable {
}
class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isMaster: Boolean): Unit = HttpBroadcast.initialize(isMaster)
+ def initialize(isMaster: Boolean) {
+ HttpBroadcast.initialize(isMaster)
+ }
def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal)
}
@@ -59,7 +61,7 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
- def initialize(isMaster: Boolean): Unit = {
+ def initialize(isMaster: Boolean) {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala
index 03f928953d..09907f4ee7 100644
--- a/core/src/main/scala/spark/broadcast/SourceInfo.scala
+++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala
@@ -10,11 +10,11 @@ import spark._
* CHANGED: Keep track of the blockSize for THIS broadcast variable.
* Broadcast.BlockSize is expected to be updated across different broadcasts
*/
-case class SourceInfo (val hostAddress: String,
- val listenPort: Int,
- val totalBlocks: Int = SourceInfo.UnusedParam,
- val totalBytes: Int = SourceInfo.UnusedParam,
- val blockSize: Int = Broadcast.BlockSize)
+case class SourceInfo (hostAddress: String,
+ listenPort: Int,
+ totalBlocks: Int = SourceInfo.UnusedParam,
+ totalBytes: Int = SourceInfo.UnusedParam,
+ blockSize: Int = Broadcast.BlockSize)
extends Comparable[SourceInfo] with Logging {
var currentLeechers = 0
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 374389def5..f5527b6ec9 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -47,7 +47,7 @@ extends Broadcast[T] with Logging with Serializable {
sendBroadcast
}
- def sendBroadcast(): Unit = {
+ def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
// Store a persistent copy in HDFS
@@ -70,25 +70,25 @@ extends Broadcast[T] with Logging with Serializable {
guideMR = new GuideMultipleRequests
guideMR.setDaemon(true)
- guideMR.start
+ guideMR.start()
logInfo("GuideMultipleRequests started...")
// Must always come AFTER guideMR is created
while (guidePort == -1) {
guidePortLock.synchronized {
- guidePortLock.wait
+ guidePortLock.wait()
}
}
serveMR = new ServeMultipleRequests
serveMR.setDaemon(true)
- serveMR.start
+ serveMR.start()
logInfo("ServeMultipleRequests started...")
// Must always come AFTER serveMR is created
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -101,8 +101,8 @@ extends Broadcast[T] with Logging with Serializable {
TreeBroadcast.registerValue(uuid, guidePort)
}
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject
+ private def readObject(in: ObjectInputStream) {
+ in.defaultReadObject()
TreeBroadcast.synchronized {
val cachedVal = TreeBroadcast.values.get(uuid, 0)
if (cachedVal != null) {
@@ -115,7 +115,7 @@ extends Broadcast[T] with Logging with Serializable {
serveMR = new ServeMultipleRequests
serveMR.setDaemon(true)
- serveMR.start
+ serveMR.start()
logInfo("ServeMultipleRequests started...")
val start = System.nanoTime
@@ -138,7 +138,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def initializeSlaveVariables: Unit = {
+ private def initializeSlaveVariables() {
arrayOfBlocks = null
totalBytes = -1
totalBlocks = -1
@@ -221,7 +221,7 @@ extends Broadcast[T] with Logging with Serializable {
// ServeMultipleRequests thread
while (listenPort == -1) {
listenPortLock.synchronized {
- listenPortLock.wait
+ listenPortLock.wait()
}
}
@@ -254,7 +254,7 @@ extends Broadcast[T] with Logging with Serializable {
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll
+ totalBlocksLock.notifyAll()
}
totalBytes = sourceInfo.totalBytes
blockSize = sourceInfo.blockSize
@@ -326,7 +326,7 @@ extends Broadcast[T] with Logging with Serializable {
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
+ hasBlocksLock.notifyAll()
}
}
} catch {
@@ -353,7 +353,7 @@ extends Broadcast[T] with Logging with Serializable {
// Keep track of sources that have completed reception
private var setOfCompletedSources = Set[SourceInfo]()
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -362,7 +362,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
- guidePortLock.notifyAll
+ guidePortLock.notifyAll()
}
try {
@@ -408,10 +408,10 @@ extends Broadcast[T] with Logging with Serializable {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
- private def sendStopBroadcastNotifications: Unit = {
+ private def sendStopBroadcastNotifications() {
listOfSources.synchronized {
var listIter = listOfSources.iterator
while (listIter.hasNext) {
@@ -463,7 +463,7 @@ extends Broadcast[T] with Logging with Serializable {
private var selectedSourceInfo: SourceInfo = null
private var thisWorkerInfo:SourceInfo = null
- override def run: Unit = {
+ override def run() {
try {
logInfo("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
@@ -569,7 +569,7 @@ extends Broadcast[T] with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -578,7 +578,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
- listenPortLock.notifyAll
+ listenPortLock.notifyAll()
}
try {
@@ -610,7 +610,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
class ServeSingleRequest(val clientSocket: Socket)
@@ -622,7 +622,7 @@ extends Broadcast[T] with Logging with Serializable {
private var sendFrom = 0
private var sendUntil = totalBlocks
- override def run: Unit = {
+ override def run() {
try {
logInfo("new ServeSingleRequest is running")
@@ -652,18 +652,18 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- private def sendObject: Unit = {
+ private def sendObject() {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
- totalBlocksLock.wait
+ totalBlocksLock.wait()
}
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
- hasBlocksLock.wait
+ hasBlocksLock.wait()
}
}
try {
@@ -704,7 +704,7 @@ extends Logging {
private var MaxDegree_ : Int = 2
- def initialize(isMaster__ : Boolean): Unit = {
+ def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
@@ -712,7 +712,7 @@ extends Logging {
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
- trackMV.start
+ trackMV.start()
// TODO: Logging the following line makes the Spark framework ID not
// getting logged, cause it calls logInfo before log4j is initialized
logInfo("TrackMultipleValues started...")
@@ -728,14 +728,14 @@ extends Logging {
def isMaster = isMaster_
- def registerValue(uuid: UUID, guidePort: Int): Unit = {
+ def registerValue(uuid: UUID, guidePort: Int) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
logInfo("New value registered with the Tracker " + valueToGuidePortMap)
}
}
- def unregisterValue(uuid: UUID): Unit = {
+ def unregisterValue(uuid: UUID) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS
logInfo("Value unregistered from the Tracker " + valueToGuidePortMap)
@@ -744,7 +744,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
- override def run: Unit = {
+ override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
@@ -766,7 +766,7 @@ extends Logging {
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
- override def run: Unit = {
+ override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
@@ -800,7 +800,7 @@ extends Logging {
}
// Shutdown the thread pool
- threadPool.shutdown
+ threadPool.shutdown()
}
}
}
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 4546dfa0fa..451faee66e 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -107,7 +107,7 @@ extends Connection(SocketChannel.open, selector_) {
val defaultChunkSize = 65536 //32768 //16384
var nextMessageToBeUsed = 0
- def addMessage(message: Message): Unit = {
+ def addMessage(message: Message) {
messages.synchronized{
/*messages += message*/
messages.enqueue(message)
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index a5a707a57d..f680a6419b 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -17,7 +17,7 @@ import java.nio.channels.spi._
import java.net._
import java.util.concurrent.Executors
-case class ConnectionManagerId(val host: String, val port: Int) {
+case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
}
@@ -112,7 +112,7 @@ class ConnectionManager(port: Int) extends Logging {
val selectedKeys = selector.selectedKeys().iterator()
while (selectedKeys.hasNext()) {
- val key = selectedKeys.next.asInstanceOf[SelectionKey]
+ val key = selectedKeys.next
selectedKeys.remove()
if (key.isValid) {
if (key.isAcceptable) {
@@ -173,7 +173,7 @@ class ConnectionManager(port: Int) extends Logging {
status.synchronized {
status.attempted = true
status.acked = false
- status.notifyAll
+ status.notifyAll()
}
})
@@ -204,7 +204,7 @@ class ConnectionManager(port: Int) extends Logging {
status.synchronized {
status.attempted = true
status.acked = false
- status.notifyAll
+ status.notifyAll()
}
})
@@ -260,7 +260,7 @@ class ConnectionManager(port: Int) extends Logging {
sentMessageStatus.ackMessage = Some(message)
sentMessageStatus.attempted = true
sentMessageStatus.acked = true
- sentMessageStatus.notifyAll
+ sentMessageStatus.notifyAll()
}
} else {
val ackMessage = if (onReceiveCallback != null) {
diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala
index 260547902b..e6535836ab 100644
--- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala
@@ -25,20 +25,24 @@ class ApproximateActionListener[T, U, R](
var failure: Option[Exception] = None // Set if the job has failed (permanently)
var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult
- override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
- evaluator.merge(index, result.asInstanceOf[U])
- finishedTasks += 1
- if (finishedTasks == totalTasks) {
- // If we had already returned a PartialResult, set its final value
- resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))
- // Notify any waiting thread that may have called getResult
- this.notifyAll()
+ override def taskSucceeded(index: Int, result: Any) {
+ synchronized {
+ evaluator.merge(index, result.asInstanceOf[U])
+ finishedTasks += 1
+ if (finishedTasks == totalTasks) {
+ // If we had already returned a PartialResult, set its final value
+ resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))
+ // Notify any waiting thread that may have called getResult
+ this.notifyAll()
+ }
}
}
- override def jobFailed(exception: Exception): Unit = synchronized {
- failure = Some(exception)
- this.notifyAll()
+ override def jobFailed(exception: Exception) {
+ synchronized {
+ failure = Some(exception)
+ this.notifyAll()
+ }
}
/**
diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala
index 7095bc8ca1..e7d2d4e8cc 100644
--- a/core/src/main/scala/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/spark/partial/PartialResult.scala
@@ -44,37 +44,43 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
* Set a handler to be called if this PartialResult's job fails. Only one failure handler
* is supported per PartialResult.
*/
- def onFail(handler: Exception => Unit): Unit = synchronized {
- if (failureHandler != None) {
- throw new UnsupportedOperationException("onFail cannot be called twice")
- }
- failureHandler = Some(handler)
- if (failure != None) {
- // We already have a failure, so let's call the handler
- handler(failure.get)
+ def onFail(handler: Exception => Unit) {
+ synchronized {
+ if (failureHandler != None) {
+ throw new UnsupportedOperationException("onFail cannot be called twice")
+ }
+ failureHandler = Some(handler)
+ if (failure != None) {
+ // We already have a failure, so let's call the handler
+ handler(failure.get)
+ }
}
}
- private[spark] def setFinalValue(value: R): Unit = synchronized {
- if (finalValue != None) {
- throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
+ private[spark] def setFinalValue(value: R) {
+ synchronized {
+ if (finalValue != None) {
+ throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
+ }
+ finalValue = Some(value)
+ // Call the completion handler if it was set
+ completionHandler.foreach(h => h(value))
+ // Notify any threads that may be calling getFinalValue()
+ this.notifyAll()
}
- finalValue = Some(value)
- // Call the completion handler if it was set
- completionHandler.foreach(h => h(value))
- // Notify any threads that may be calling getFinalValue()
- this.notifyAll()
}
- private[spark] def setFailure(exception: Exception): Unit = synchronized {
- if (failure != None) {
- throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
+ private[spark] def setFailure(exception: Exception) {
+ synchronized {
+ if (failure != None) {
+ throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
+ }
+ failure = Some(exception)
+ // Call the failure handler if it was set
+ failureHandler.foreach(h => h(exception))
+ // Notify any threads that may be calling getFinalValue()
+ this.notifyAll()
}
- failure = Some(exception)
- // Call the failure handler if it was set
- failureHandler.foreach(h => h(exception))
- // Notify any threads that may be calling getFinalValue()
- this.notifyAll()
}
override def toString: String = synchronized {
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index fc8adbc517..436c16cddd 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -529,7 +529,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
if (locs != Nil)
- return locs;
+ return locs
}
case _ =>
})
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
index be8ec9bd7b..4c2ae23051 100644
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala
@@ -12,26 +12,30 @@ class JobWaiter(totalTasks: Int) extends JobListener {
private var jobFinished = false // Is the job as a whole finished (succeeded or failed)?
private var jobResult: JobResult = null // If the job is finished, this will be its result
- override def taskSucceeded(index: Int, result: Any) = synchronized {
- if (jobFinished) {
- throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
- }
- taskResults(index) = result
- finishedTasks += 1
- if (finishedTasks == totalTasks) {
- jobFinished = true
- jobResult = JobSucceeded(taskResults)
- this.notifyAll()
+ override def taskSucceeded(index: Int, result: Any) {
+ synchronized {
+ if (jobFinished) {
+ throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
+ }
+ taskResults(index) = result
+ finishedTasks += 1
+ if (finishedTasks == totalTasks) {
+ jobFinished = true
+ jobResult = JobSucceeded(taskResults)
+ this.notifyAll()
+ }
}
}
- override def jobFailed(exception: Exception) = synchronized {
- if (jobFinished) {
- throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
+ override def jobFailed(exception: Exception) {
+ synchronized {
+ if (jobFinished) {
+ throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
+ }
+ jobFinished = true
+ jobResult = JobFailed(exception)
+ this.notifyAll()
}
- jobFinished = true
- jobResult = JobFailed(exception)
- this.notifyAll()
}
def getResult(): JobResult = synchronized {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 79cca0f294..8c0e06f020 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -42,7 +42,7 @@ object ShuffleMapTask {
if (old != null) {
return old
} else {
- val loader = currentThread.getContextClassLoader
+ val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
@@ -107,7 +107,7 @@ class ShuffleMapTask(
override def run(attemptId: Int): BlockManagerId = {
val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
- val partitioner = dep.partitioner.asInstanceOf[Partitioner]
+ val partitioner = dep.partitioner
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
for (elem <- rdd.iterator(split)) {
val (k, v) = elem.asInstanceOf[(Any, Any)]
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
index 95ad6c5b59..2eee36264a 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
@@ -210,7 +210,7 @@ class CoarseMesosScheduler(
}
// Also report the loss to the DAGScheduler
listener.hostLost(failedHost.get)
- reviveOffers();
+ reviveOffers()
}
}
@@ -283,9 +283,9 @@ class CoarseMesosScheduler(
class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) {
generation = 0
- def run(id: Int): Unit = {
+ def run(id: Int) {
val env = SparkEnv.get
- val classLoader = currentThread.getContextClassLoader
+ val classLoader = Thread.currentThread.getContextClassLoader
val actor = env.actorSystem.actorOf(
Props(new WorkerActor(slaveId, host, env, classLoader)),
name = "WorkerActor")
@@ -309,7 +309,7 @@ class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: Cla
class TaskRunner(desc: MTaskInfo)
extends Runnable {
- override def run() = {
+ override def run() {
val tid = desc.getTaskId.getValue
logInfo("Running task ID " + tid)
try {
@@ -360,7 +360,7 @@ class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: Cla
}
override def receive = {
- case LaunchTask(slaveId, task) =>
+ case LaunchTask(slaveId_, task) =>
threadPool.execute(new TaskRunner(task))
}
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
index f72618c03f..8e34537674 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
@@ -107,7 +107,7 @@ class MesosScheduler(
override def start() {
new Thread("MesosScheduler driver") {
setDaemon(true)
- override def run {
+ override def run() {
val sched = MesosScheduler.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
driver = new MesosSchedulerDriver(sched, fwInfo, master)
@@ -122,7 +122,7 @@ class MesosScheduler(
if (System.getProperty("spark.speculation", "false") == "true") {
new Thread("MesosScheduler speculation check") {
setDaemon(true)
- override def run {
+ override def run() {
waitForRegister()
while (true) {
try {
@@ -184,7 +184,7 @@ class MesosScheduler(
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet()
}
- reviveOffers();
+ reviveOffers()
}
def taskSetFinished(manager: TaskSetManager) {
@@ -331,7 +331,7 @@ class MesosScheduler(
}
if (failedHost != None) {
listener.hostLost(failedHost.get)
- reviveOffers();
+ reviveOffers()
}
if (taskFailed) {
// Also revive offers if a task had failed for some reason other than host lost
@@ -439,7 +439,7 @@ class MesosScheduler(
}
if (failedHost != None) {
listener.hostLost(failedHost.get)
- reviveOffers();
+ reviveOffers()
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 3a8574a815..501183ab1f 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -82,7 +82,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
val block = blockManager.getLocal(id)
val buffer = block match {
case Some(tValues) => {
- val values = tValues.asInstanceOf[Iterator[Any]]
+ val values = tValues
val buffer = blockManager.dataSerialize(values)
buffer
}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 75df4bee09..816411debe 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -14,9 +14,11 @@ object FailureSuiteState {
var tasksRun = 0
var tasksFailed = 0
- def clear(): Unit = synchronized {
- tasksRun = 0
- tasksFailed = 0
+ def clear() {
+ synchronized {
+ tasksRun = 0
+ tasksFailed = 0
+ }
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 078071209a..06d446ea24 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -11,8 +11,9 @@ import SparkContext._
class KryoSerializerSuite extends FunSuite {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
- def check[T](t: T): Unit =
+ def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
check(1)
check(1L)
check(1.0f)
@@ -39,8 +40,9 @@ class KryoSerializerSuite extends FunSuite {
test("pairs") {
val ser = (new KryoSerializer).newInstance()
- def check[T](t: T): Unit =
+ def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
check((1, 1))
check((1, 1L))
check((1L, 1))
@@ -62,8 +64,9 @@ class KryoSerializerSuite extends FunSuite {
test("Scala data structures") {
val ser = (new KryoSerializer).newInstance()
- def check[T](t: T): Unit =
+ def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
check(List[Int]())
check(List[Int](1, 2, 3))
check(List[String]())
@@ -86,8 +89,9 @@ class KryoSerializerSuite extends FunSuite {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
val ser = (new KryoSerializer).newInstance()
- def check[T](t: T): Unit =
+ def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
check(CaseClass(17, "hello"))
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index cadf01432f..d38e72d8b8 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -31,7 +31,7 @@ class ThreadingSuite extends FunSuite {
new Thread {
override def run() {
answer1 = nums.reduce(_ + _)
- answer2 = nums.first // This will run "locally" in the current thread
+ answer2 = nums.first() // This will run "locally" in the current thread
sem.release()
}
}.start()
@@ -51,13 +51,13 @@ class ThreadingSuite extends FunSuite {
override def run() {
val answer1 = nums.reduce(_ + _)
if (answer1 != 55) {
- printf("In thread %d: answer1 was %d\n", i, answer1);
- ok = false;
+ printf("In thread %d: answer1 was %d\n", i, answer1)
+ ok = false
}
- val answer2 = nums.first // This will run "locally" in the current thread
+ val answer2 = nums.first() // This will run "locally" in the current thread
if (answer2 != 1) {
- printf("In thread %d: answer2 was %d\n", i, answer2);
- ok = false;
+ printf("In thread %d: answer2 was %d\n", i, answer2)
+ ok = false
}
sem.release()
}
@@ -80,13 +80,13 @@ class ThreadingSuite extends FunSuite {
override def run() {
val answer1 = nums.reduce(_ + _)
if (answer1 != 55) {
- printf("In thread %d: answer1 was %d\n", i, answer1);
- ok = false;
+ printf("In thread %d: answer1 was %d\n", i, answer1)
+ ok = false
}
- val answer2 = nums.first // This will run "locally" in the current thread
+ val answer2 = nums.first() // This will run "locally" in the current thread
if (answer2 != 1) {
- printf("In thread %d: answer2 was %d\n", i, answer2);
- ok = false;
+ printf("In thread %d: answer2 was %d\n", i, answer2)
+ ok = false
}
sem.release()
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 313a877ed2..0e2b6d4902 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -5,3 +5,5 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
+
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")