aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-10-02 19:25:26 -0700
committerDenny <dennybritz@gmail.com>2012-10-02 19:28:37 -0700
commit18a1faedf64ea712348367e8d8bc0c9db0e0591a (patch)
treeebd3379549dbbeb234db170ad2481d7bb546afd8 /core
parentb7a913e1facae6ee24b63bd1e6b3379956d1c8c4 (diff)
downloadspark-18a1faedf64ea712348367e8d8bc0c9db0e0591a.tar.gz
spark-18a1faedf64ea712348367e8d8bc0c9db0e0591a.tar.bz2
spark-18a1faedf64ea712348367e8d8bc0c9db0e0591a.zip
Stylistic changes and Public Accumulable and Broadcast
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Accumulators.scala10
-rw-r--r--core/src/main/scala/spark/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/spark/DoubleRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala6
-rw-r--r--core/src/main/scala/spark/NewHadoopRDD.scala3
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala4
-rw-r--r--core/src/main/scala/spark/SampledRDD.scala3
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala10
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala5
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala27
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala3
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala3
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala3
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala3
-rw-r--r--core/src/main/scala/spark/network/Connection.scala3
-rw-r--r--core/src/main/scala/spark/partial/BoundedDouble.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/TaskResult.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala27
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala3
-rw-r--r--core/src/main/scala/spark/util/SerializableBuffer.scala3
-rw-r--r--core/src/main/scala/spark/util/StatCounter.scala3
30 files changed, 117 insertions, 59 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index cd8e43f556..0363434d7a 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -18,7 +18,7 @@ import scala.collection.generic.Growable
* @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
-private[spark] class Accumulable[R, T] (
+class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
extends Serializable {
@@ -73,7 +73,7 @@ private[spark] class Accumulable[R, T] (
* @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
-private[spark] trait AccumulableParam[R, T] extends Serializable {
+trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value.
* @param r the current value of the accumulator
@@ -93,7 +93,7 @@ private[spark] trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}
-private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
+class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
extends AccumulableParam[R,T] {
def addAccumulator(growable: R, elem: T) : R = {
@@ -124,7 +124,7 @@ private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableO
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
-private[spark] class Accumulator[T](
+class Accumulator[T](
@transient initialValue: T,
param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
@@ -133,7 +133,7 @@ private[spark] class Accumulator[T](
* as the accumulated value
* @tparam T type of value to accumulate
*/
-private[spark] trait AccumulatorParam[T] extends AccumulableParam[T, T] {
+AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T) : T = {
addInPlace(t1, t2)
}
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala
index 9f94bcb413..83db2d2934 100644
--- a/core/src/main/scala/spark/CartesianRDD.scala
+++ b/core/src/main/scala/spark/CartesianRDD.scala
@@ -1,6 +1,7 @@
package spark
-private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
+private[spark]
+class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index: Int = idx
}
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index 2c270766f9..daba719b14 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -10,7 +10,8 @@ private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-private[spark] class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
+private[spark]
+class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala
index 6252dc44f7..1fbf66b7de 100644
--- a/core/src/main/scala/spark/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala
@@ -10,7 +10,7 @@ import spark.util.StatCounter
/**
* Extra functions available on RDDs of Doubles through an implicit conversion.
*/
-private[spark] class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
+class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
def sum(): Double = {
self.reduce(_ + _)
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 1e55621b8e..b8aa3a86c5 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -68,7 +68,8 @@ private[spark] object ZigZag {
}
}
-private[spark] class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
+private[spark]
+class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
@@ -85,7 +86,8 @@ extends SerializationStream {
def close() { out.close() }
}
-private[spark] class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
+private[spark]
+class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
extends DeserializationStream {
def readObject[T](): T = {
val len = ZigZag.readInt(in)
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala
index 9c5ad3511e..9072698357 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/NewHadoopRDD.scala
@@ -13,7 +13,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
-private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
+private[spark]
+class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 57fdb741df..80d62caf25 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -41,7 +41,7 @@ import spark.partial.PartialResult
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
-private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
@@ -430,7 +430,7 @@ private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def getValueClass() = implicitly[ClassManifest[V]].erasure
}
-private[spark] class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala
index 4b33148364..ac10aed477 100644
--- a/core/src/main/scala/spark/SampledRDD.scala
+++ b/core/src/main/scala/spark/SampledRDD.scala
@@ -4,7 +4,8 @@ import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
-private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
+private[spark]
+class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index
}
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6224e957e5..ea7171d3a1 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -30,7 +30,7 @@ import SparkContext._
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*/
-private[spark] class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
+class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 3e5668892f..420c54bc9a 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -10,7 +10,13 @@ import spark.storage.BlockManagerId
private[spark] sealed trait TaskEndReason
private[spark] case object Success extends TaskEndReason
-private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-private[spark] case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
+
+private[spark]
+case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
+
+private[spark]
+case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
+
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
+
private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 370978113f..6055bfd045 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong
import spark._
-private[spark] abstract class Broadcast[T](id: Long) extends Serializable {
+abstract class Broadcast[T](id: Long) extends Serializable {
def value: T
// We cannot have an abstract readObject here due to some weird issues with
@@ -14,7 +14,8 @@ private[spark] abstract class Broadcast[T](id: Long) extends Serializable {
override def toString = "spark.Broadcast(" + id + ")"
}
-private[spark] class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
+private[spark]
+class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7eaae2c618..d2b63d6e0d 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -11,10 +11,12 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
-private[spark] case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
+private[spark]
+case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
-private[spark] case class ExecutorStateChanged(
+private[spark]
+case class ExecutorStateChanged(
jobId: String,
execId: Int,
state: ExecutorState,
@@ -42,10 +44,17 @@ private[spark] case class RegisterJob(jobDescription: JobDescription) extends De
// Master to Client
-private[spark] case class RegisteredJob(jobId: String) extends DeployMessage
-private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
-private[spark] case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
-private[spark] case class JobKilled(message: String)
+private[spark]
+case class RegisteredJob(jobId: String) extends DeployMessage
+
+private[spark]
+case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
+
+private[spark]
+case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
+
+private[spark]
+case class JobKilled(message: String)
// Internal message in Client
@@ -57,7 +66,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
-private[spark] case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
+private[spark]
+case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
completedJobs: List[JobInfo])
// WorkerWebUI to Worker
@@ -65,6 +75,7 @@ private[spark] case object RequestWorkerState
// Worker to WorkerWebUI
-private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
+private[spark]
+case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index e938981f6e..8b2a71add5 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -9,7 +9,8 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
-private[spark] class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
+private[spark]
+class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index e2364f1863..8795c09cc1 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -5,7 +5,8 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
-private[spark] class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
+private[spark]
+class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index c083e7f5ea..700a41c770 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -10,7 +10,8 @@ import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
-private[spark] class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
+private[spark]
+class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index 78a9adc86f..d06f4884ee 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -9,7 +9,8 @@ import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
-private[spark] class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
+private[spark]
+class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 3a03f6843a..80262ab7b4 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -11,7 +11,8 @@ import java.nio.channels.spi._
import java.net._
-private[spark] abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
+private[spark]
+abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true)
diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala
index ab5cc21aa0..8bedd75182 100644
--- a/core/src/main/scala/spark/partial/BoundedDouble.scala
+++ b/core/src/main/scala/spark/partial/BoundedDouble.scala
@@ -3,6 +3,7 @@ package spark.partial
/**
* A Double with error bars on it.
*/
-private[spark] class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
+private[spark]
+class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 33388545c4..9b666ed181 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,8 @@ import spark.storage.BlockManagerId
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
*/
-private[spark] class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
+private[spark]
+class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
taskSched.setListener(this)
// Called by TaskScheduler to report task completions or failures.
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index c7b123ce7f..9a54d0e854 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -7,7 +7,8 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
-private[spark] class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
+private[spark]
+class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
def this() = this(null.asInstanceOf[T], null)
override def writeExternal(out: ObjectOutput) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
index 3c8f511a07..96ebaa4601 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
@@ -1,3 +1,4 @@
package spark.scheduler.cluster
-private[spark] class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
+private[spark]
+class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index c4e8fea3dc..1386cd9d44 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -7,17 +7,25 @@ import spark.util.SerializableBuffer
private[spark] sealed trait StandaloneClusterMessage extends Serializable
// Master to slaves
-private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
-private[spark] case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
-private[spark] case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+private[spark]
+case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+
+private[spark]
+case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+
+private[spark]
+case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
-private[spark] case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+private[spark]
+case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
-private[spark] case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+private[spark]
+case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
-private[spark] object StatusUpdate {
+private[spark]
+object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 8aebdedda2..d2cce0dc05 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -16,7 +16,8 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
-private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+private[spark]
+class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index ae2c6b9836..ca84503780 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -3,7 +3,8 @@ package spark.scheduler.cluster
/**
* Information about a running task attempt inside a TaskSet.
*/
-private[spark] class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
+private[spark]
+class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
var finishTime: Long = 0
var failed = false
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 298b6d5529..6b919d68b2 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -3,5 +3,6 @@ package spark.scheduler.cluster
/**
* Represents free resources available on a worker node.
*/
-private[spark] class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
+private[spark]
+class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bb033871b6..7d8f9ff824 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -43,7 +43,8 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
-private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null)
+private[spark]
+case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
@@ -56,7 +57,8 @@ private[spark] class BlockLocker(numLockers: Int) {
}
-private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+private[spark]
+class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ef76b3f470..7bfa31ac3d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -16,14 +16,17 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-private[spark] sealed trait ToBlockManagerMaster
+private[spark]
+sealed trait ToBlockManagerMaster
-private[spark] case class RegisterBlockManager(
+private[spark]
+case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long)
extends ToBlockManagerMaster
-private[spark] class HeartBeat(
+private[spark]
+class HeartBeat(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -53,7 +56,8 @@ private[spark] class HeartBeat(
}
}
-private[spark] object HeartBeat {
+private[spark]
+object HeartBeat {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -68,15 +72,20 @@ private[spark] object HeartBeat {
}
}
-private[spark] case class GetLocations(blockId: String) extends ToBlockManagerMaster
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
-private[spark] case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-private[spark] case class RemoveHost(host: String) extends ToBlockManagerMaster
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
-private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index 8cf7565be7..a25decb123 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -8,7 +8,8 @@ import scala.collection.mutable.ArrayBuffer
import spark._
import spark.network._
-private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
+private[spark]
+class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
index b6e153b00b..09d588fe1c 100644
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/spark/util/SerializableBuffer.scala
@@ -8,7 +8,8 @@ import java.nio.channels.Channels
* A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
* it easier to pass ByteBuffers in case class messages.
*/
-private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
+private[spark]
+class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer
private def readObject(in: ObjectInputStream) {
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 101895f08e..023ec09332 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -5,7 +5,8 @@ package spark.util
* numerically robust way. Includes support for merging two StatCounters. Based on Welford and
* Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
*/
-private[spark] class StatCounter(values: TraversableOnce[Double]) extends Serializable {
+private[spark]
+class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)