aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala4
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java2
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/Partition.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/SparkException.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslClient.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionId.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/SecurityMessage.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/partial/PartialResult.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala214
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessage.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/NextIterator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/StatCounter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Vector.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala20
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala16
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala4
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala4
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkImports.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala8
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
82 files changed, 467 insertions, 467 deletions
diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
index 9c37fadb78..69144e3e65 100644
--- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
@@ -28,9 +28,9 @@ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializ
class TestMessage(val targetId: String) extends Message[String] with Serializable
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index fa75842047..23f5fdd436 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -24,4 +24,4 @@ import java.io.Serializable;
*/
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
-} \ No newline at end of file
+}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index d1fdec0724..c48e92f535 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -24,4 +24,4 @@ import java.io.Serializable;
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 3d7692ea8a..a6e300d345 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -24,13 +24,13 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
-
+
var baseDir : File = null
var fileDir : File = null
var jarDir : File = null
var httpServer : HttpServer = null
var serverUri : String = null
-
+
def initialize() {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
@@ -43,24 +43,24 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
}
-
+
def stop() {
httpServer.stop()
}
-
+
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
}
-
+
def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
}
-
+
def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}
-
+
}
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cb5df25fa4..7e9b517f90 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -83,19 +83,19 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
}
}
- /**
+ /**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
* isn't passed in plaintext.
*/
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
val constraint = new Constraint()
- // use DIGEST-MD5 as the authentication mechanism
+ // use DIGEST-MD5 as the authentication mechanism
constraint.setName(Constraint.__DIGEST_AUTH)
constraint.setRoles(Array("user"))
constraint.setAuthenticate(true)
constraint.setDataConstraint(Constraint.DC_NONE)
-
+
val cm = new ConstraintMapping()
cm.setConstraint(constraint)
cm.setPathSpec("/*")
diff --git a/core/src/main/scala/org/apache/spark/Partition.scala b/core/src/main/scala/org/apache/spark/Partition.scala
index 87914a061f..27892dbd2a 100644
--- a/core/src/main/scala/org/apache/spark/Partition.scala
+++ b/core/src/main/scala/org/apache/spark/Partition.scala
@@ -25,7 +25,7 @@ trait Partition extends Serializable {
* Get the split's index within its parent RDD
*/
def index: Int
-
+
// A better default implementation of HashCode
override def hashCode(): Int = index
}
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 2237ee3bb7..b52f2d4f41 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -25,93 +25,93 @@ import org.apache.hadoop.io.Text
import org.apache.spark.deploy.SparkHadoopUtil
-/**
- * Spark class responsible for security.
- *
+/**
+ * Spark class responsible for security.
+ *
* In general this class should be instantiated by the SparkEnv and most components
- * should access it from that. There are some cases where the SparkEnv hasn't been
+ * should access it from that. There are some cases where the SparkEnv hasn't been
* initialized yet and this class must be instantiated directly.
- *
+ *
* Spark currently supports authentication via a shared secret.
* Authentication can be configured to be on via the 'spark.authenticate' configuration
- * parameter. This parameter controls whether the Spark communication protocols do
+ * parameter. This parameter controls whether the Spark communication protocols do
* authentication using the shared secret. This authentication is a basic handshake to
* make sure both sides have the same shared secret and are allowed to communicate.
- * If the shared secret is not identical they will not be allowed to communicate.
- *
- * The Spark UI can also be secured by using javax servlet filters. A user may want to
- * secure the UI if it has data that other users should not be allowed to see. The javax
- * servlet filter specified by the user can authenticate the user and then once the user
- * is logged in, Spark can compare that user versus the view acls to make sure they are
- * authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
+ * If the shared secret is not identical they will not be allowed to communicate.
+ *
+ * The Spark UI can also be secured by using javax servlet filters. A user may want to
+ * secure the UI if it has data that other users should not be allowed to see. The javax
+ * servlet filter specified by the user can authenticate the user and then once the user
+ * is logged in, Spark can compare that user versus the view acls to make sure they are
+ * authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* control the behavior of the acls. Note that the person who started the application
* always has view access to the UI.
*
* Spark does not currently support encryption after authentication.
- *
+ *
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
- * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- * Akka remoting allows you to specify a secure cookie that will be exchanged
- * and ensured to be identical in the connection handshake between the client
- * and the server. If they are not identical then the client will be refused
- * to connect to the server. There is no control of the underlying
- * authentication mechanism so its not clear if the password is passed in
+ * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
+ * Akka remoting allows you to specify a secure cookie that will be exchanged
+ * and ensured to be identical in the connection handshake between the client
+ * and the server. If they are not identical then the client will be refused
+ * to connect to the server. There is no control of the underlying
+ * authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
* Akka also has an option to turn on SSL, this option is not currently supported
* but we could add a configuration option in the future.
- *
- * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
- * for the HttpServer. Jetty supports multiple authentication mechanisms -
- * Basic, Digest, Form, Spengo, etc. It also supports multiple different login
+ *
+ * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
+ * for the HttpServer. Jetty supports multiple authentication mechanisms -
+ * Basic, Digest, Form, Spengo, etc. It also supports multiple different login
* services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService
- * to authenticate using DIGEST-MD5 via a single user and the shared secret.
+ * to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
* We currently do not support SSL (https), but Jetty can be configured to use it
* so we could add a configuration option for this in the future.
- *
+ *
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
- * Any clients must specify the user and password. There is a default
+ * Any clients must specify the user and password. There is a default
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
- * - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
- * exchange messages. For this we use the Java SASL
- * (Simple Authentication and Security Layer) API and again use DIGEST-MD5
+ * - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
+ * exchange messages. For this we use the Java SASL
+ * (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
* over the wire in plaintext.
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection is not supporting integrity or privacy protection (encryption)
- * after authentication. SASL also supports "auth-int" and "auth-conf" which
+ * after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could be support in the future to allow the user to specify the quality
- * of protection they want. If we support those, the messages will also have to
+ * of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
- *
- * Since the connectionManager does asynchronous messages passing, the SASL
+ *
+ * Since the connectionManager does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
- * A ConnectionId was added to be able to track connections and is used to
+ * A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* If its acting as a client and trying to send a message to another ConnectionManager,
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake.
*
- * - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
+ * - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
* companies normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
- * The filters can also be used for many different purposes. For instance filters
+ * The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
- *
+ *
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
- *
+ *
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
@@ -121,7 +121,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
* and Spark will use that to do authorization against the view acls.
- *
+ *
* For other Spark deployments, the shared secret must be specified via the
* spark.authenticate.secret config.
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
@@ -152,7 +152,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())
// Set our own authenticator to properly negotiate user/password for HTTP connections.
- // This is needed by the HTTP client fetching from the HttpServer. Put here so its
+ // This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
Authenticator.setDefault(
@@ -214,12 +214,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
def uiAclsEnabled(): Boolean = uiAclsOn
/**
- * Checks the given user against the view acl list to see if they have
+ * Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls must are disabled
* via spark.ui.acls.enable, all users have view access.
- *
+ *
* @param user to see if is authorized
- * @return true is the user has permission, otherwise false
+ * @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index d34e47e8ca..4351ed74b6 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -20,5 +20,5 @@ package org.apache.spark
class SparkException(message: String, cause: Throwable)
extends Exception(message, cause) {
- def this(message: String) = this(message, null)
+ def this(message: String) = this(message, null)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index b92ea01a87..f6703986bd 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
-
+
private var jobID = 0
private var splitID = 0
private var attemptID = 0
@@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
-
- val jCtxt = getJobContext()
+
+ val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
}
@@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)
-
+
val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
val fs: FileSystem = {
@@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
- getOutputCommitter().setupTask(getTaskContext())
+ getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
}
@@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def commit() {
val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
+ val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
- case e: IOException => {
+ case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
- }
+ }
} else {
logWarning ("No need to commit output of task: " + taID.value)
}
@@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
private def getJobContext(): JobContext = {
- if (jobContext == null) {
+ if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
@@ -175,7 +175,7 @@ object SparkHadoopWriter {
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}
-
+
def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
index a2a871cbd3..5b14c4291d 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
@@ -44,12 +44,12 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
* configurable in the future.
*/
private var saslClient: SaslClient = Sasl.createSaslClient(Array[String](SparkSaslServer.DIGEST),
- null, null, SparkSaslServer.SASL_DEFAULT_REALM, SparkSaslServer.SASL_PROPS,
+ null, null, SparkSaslServer.SASL_DEFAULT_REALM, SparkSaslServer.SASL_PROPS,
new SparkSaslClientCallbackHandler(securityMgr))
/**
* Used to initiate SASL handshake with server.
- * @return response to challenge if needed
+ * @return response to challenge if needed
*/
def firstToken(): Array[Byte] = {
synchronized {
@@ -86,7 +86,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
}
/**
- * Disposes of any system resources or security-sensitive information the
+ * Disposes of any system resources or security-sensitive information the
* SaslClient might be using.
*/
def dispose() {
@@ -110,7 +110,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
private class SparkSaslClientCallbackHandler(securityMgr: SecurityManager) extends
CallbackHandler {
- private val userName: String =
+ private val userName: String =
SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes())
private val secretKey = securityMgr.getSecretKey()
private val userPassword: Array[Char] =
@@ -138,7 +138,7 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
rc.setText(rc.getDefaultText())
}
case cb: RealmChoiceCallback => {}
- case cb: Callback => throw
+ case cb: Callback => throw
new UnsupportedCallbackException(cb, "handle: Unrecognized SASL client callback")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
index 11fcb2ae3a..6161a6fb7a 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
@@ -64,7 +64,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
}
/**
- * Disposes of any system resources or security-sensitive information the
+ * Disposes of any system resources or security-sensitive information the
* SaslServer might be using.
*/
def dispose() {
@@ -88,7 +88,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
private class SparkSaslDigestCallbackHandler(securityMgr: SecurityManager)
extends CallbackHandler {
- private val userName: String =
+ private val userName: String =
SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes())
override def handle(callbacks: Array[Callback]) {
@@ -123,7 +123,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
ac.setAuthorizedID(authzid)
}
}
- case cb: Callback => throw
+ case cb: Callback => throw
new UnsupportedCallbackException(cb, "handle: Unrecognized SASL DIGEST-MD5 Callback")
}
}
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 4597595a83..f3f59e47c3 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -31,7 +31,7 @@ import com.google.common.io.Files
* projects.
*
* TODO: See if we can move this to the test codebase by specifying
- * test dependencies between projects.
+ * test dependencies between projects.
*/
private[spark] object TestUtils {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 2b32546c68..2659274c5e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -158,7 +158,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
}
def receiveBroadcast(): Boolean = {
- // Receive meta-info about the size of broadcast data,
+ // Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index c07838f798..5da9615c9e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -43,7 +43,7 @@ private[spark] class ClientArguments(args: Array[String]) {
// kill parameters
var driverId: String = ""
-
+
parse(args.toList)
def parse(args: List[String]): Unit = args match {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index d35d5be73f..3836bf219e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -32,8 +32,8 @@ private[spark] class WorkerArguments(args: Array[String]) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
-
- // Check for settings in environment variables
+
+ // Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
port = System.getenv("SPARK_WORKER_PORT").toInt
}
@@ -49,7 +49,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
-
+
parse(args.toList)
def parse(args: List[String]): Unit = args match {
@@ -78,7 +78,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
case ("--work-dir" | "-d") :: value :: tail =>
workDir = value
parse(tail)
-
+
case "--webui-port" :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 85200ab0e1..49c1009cac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -137,7 +137,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
.format(executor.appId, executor.execId)}>stdout</a>
<a href={"logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.appId, executor.execId)}>stderr</a>
- </td>
+ </td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 16887d8892..6327ac0166 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -53,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
- executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
+ executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false)
case RegisterExecutorFailed(message) =>
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index ceff3a067d..38be2c58b3 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -34,7 +34,7 @@ object ExecutorExitCode {
logging the exception. */
val UNCAUGHT_EXCEPTION_TWICE = 51
- /** The default uncaught exception handler was reached, and the uncaught exception was an
+ /** The default uncaught exception handler was reached, and the uncaught exception was an
OutOfMemoryError. */
val OOM = 52
@@ -43,10 +43,10 @@ object ExecutorExitCode {
/** TachyonStore failed to initialize after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
-
+
/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
-
+
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -57,7 +57,7 @@ object ExecutorExitCode {
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
"TachyonStore failed to create a local temporary directory."
- case _ =>
+ case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
" (died from signal " + (exitCode - 128) + "?)"
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
index 208e77073f..218ed7b5d2 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
@@ -38,7 +38,7 @@ private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: Class
override def addURL(url: URL) {
super.addURL(url)
}
- override def findClass(name: String): Class[_] = {
+ override def findClass(name: String): Class[_] = {
super.findClass(name)
}
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 42c1200926..542dce6536 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -45,7 +45,7 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
-
+
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 2f7576c53b..3ffaaab23d 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -248,14 +248,14 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
- // outbox is used as a lock - ensure that it is always used as a leaf (since methods which
+ // outbox is used as a lock - ensure that it is always used as a leaf (since methods which
// lock it are invoked in context of other locks)
private val outbox = new Outbox()
/*
- This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
- different purpose. This flag is to see if we need to force reregister for write even when we
+ This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
+ different purpose. This flag is to see if we need to force reregister for write even when we
do not have any pending bytes to write to socket.
- This can happen due to a race between adding pending buffers, and checking for existing of
+ This can happen due to a race between adding pending buffers, and checking for existing of
data as detailed in https://github.com/mesos/spark/pull/791
*/
private var needForceReregister = false
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
index ffaab677d4..d579c165a1 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
@@ -18,7 +18,7 @@
package org.apache.spark.network
private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
- override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
+ override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
}
private[spark] object ConnectionId {
@@ -26,9 +26,9 @@ private[spark] object ConnectionId {
def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
val res = connectionIdString.split("_").map(_.trim())
if (res.size != 3) {
- throw new Exception("Error converting ConnectionId string: " + connectionIdString +
+ throw new Exception("Error converting ConnectionId string: " + connectionIdString +
" to a ConnectionId Object")
}
new ConnectionId(new ConnectionManagerId(res(0), res(1).toInt), res(2).toInt)
- }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index bdf586351a..cfee41c613 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -79,7 +79,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
private val serverChannel = ServerSocketChannel.open()
// used to track the SendingConnections waiting to do SASL negotiation
- private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection]
+ private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection]
with SynchronizedMap[ConnectionId, SendingConnection]
private val connectionsByKey =
new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
@@ -141,7 +141,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
} finally {
writeRunnableStarted.synchronized {
writeRunnableStarted -= key
- val needReregister = register || conn.resetForceReregister()
+ val needReregister = register || conn.resetForceReregister()
if (needReregister && conn.changeInterestForWrite()) {
conn.registerInterest()
}
@@ -509,7 +509,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
private def handleClientAuthentication(
waitingConn: SendingConnection,
- securityMsg: SecurityMessage,
+ securityMsg: SecurityMessage,
connectionId : ConnectionId) {
if (waitingConn.isSaslComplete()) {
logDebug("Client sasl completed for id: " + waitingConn.connectionId)
@@ -530,7 +530,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
return
}
- var securityMsgResp = SecurityMessage.fromResponse(replyToken,
+ var securityMsgResp = SecurityMessage.fromResponse(replyToken,
securityMsg.getConnectionId.toString())
var message = securityMsgResp.toBufferMessage
if (message == null) throw new Exception("Error creating security message")
@@ -546,7 +546,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
private def handleServerAuthentication(
- connection: Connection,
+ connection: Connection,
securityMsg: SecurityMessage,
connectionId: ConnectionId) {
if (!connection.isSaslComplete()) {
@@ -561,7 +561,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
replyToken = connection.sparkSaslServer.response(securityMsg.getToken)
if (connection.isSaslComplete()) {
- logDebug("Server sasl completed: " + connection.connectionId)
+ logDebug("Server sasl completed: " + connection.connectionId)
} else {
logDebug("Server sasl not completed: " + connection.connectionId)
}
@@ -571,7 +571,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
var message = securityMsgResp.toBufferMessage
if (message == null) throw new Exception("Error creating security Message")
sendSecurityMessage(connection.getRemoteConnectionManagerId(), message)
- }
+ }
} catch {
case e: Exception => {
logError("Error in server auth negotiation: " + e)
@@ -581,7 +581,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
} else {
- logDebug("connection already established for this connection id: " + connection.connectionId)
+ logDebug("connection already established for this connection id: " + connection.connectionId)
}
}
@@ -609,8 +609,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
return true
} else {
if (!conn.isSaslComplete()) {
- // We could handle this better and tell the client we need to do authentication
- // negotiation, but for now just ignore them.
+ // We could handle this better and tell the client we need to do authentication
+ // negotiation, but for now just ignore them.
logError("message sent that is not security negotiation message on connection " +
"not authenticated yet, ignoring it!!")
return true
@@ -709,11 +709,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
} else {
- logDebug("Sasl already established ")
+ logDebug("Sasl already established ")
}
}
- // allow us to add messages to the inbox for doing sasl negotiating
+ // allow us to add messages to the inbox for doing sasl negotiating
private def sendSecurityMessage(connManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connManagerId.host, connManagerId.port)
@@ -772,7 +772,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
if (((clock.getTime() - startTime) >= (authTimeout * 1000))
&& (!connection.isSaslComplete())) {
// took to long to authenticate the connection, something probably went wrong
- throw new Exception("Took to long for authentication to " + connectionManagerId +
+ throw new Exception("Took to long for authentication to " + connectionManagerId +
", waited " + authTimeout + "seconds, failing.")
}
}
@@ -794,7 +794,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
case None => {
- logError("no messageStatus for failed message id: " + message.id)
+ logError("no messageStatus for failed message id: " + message.id)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 9d9b9dbdd5..4894ecd41f 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -37,11 +37,11 @@ private[spark] object ConnectionManagerTest extends Logging{
"[size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
-
+
if (args(0).startsWith("local")) {
println("This runs only on a mesos cluster")
}
-
+
val sc = new SparkContext(args(0), "ConnectionManagerTest")
val slavesFile = Source.fromFile(args(1))
val slaves = slavesFile.mkString.split("\n")
@@ -50,7 +50,7 @@ private[spark] object ConnectionManagerTest extends Logging{
/* println("Slaves") */
/* slaves.foreach(println) */
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
- val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
+ val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
println("Running " + count + " rounds of test: " + "parallel tasks = " + tasknum + ", " +
@@ -64,16 +64,16 @@ private[spark] object ConnectionManagerTest extends Logging{
(0 until count).foreach(i => {
val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
- val thisConnManagerId = connManager.id
- connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+ val thisConnManagerId = connManager.id
+ connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
logInfo("Received [" + msg + "] from [" + id + "]")
None
})
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
-
- val startTime = System.currentTimeMillis
+
+ val startTime = System.currentTimeMillis
val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId =>
{
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
@@ -84,7 +84,7 @@ private[spark] object ConnectionManagerTest extends Logging{
val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
-
+
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms *
@@ -92,11 +92,11 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo(resultStr)
resultStr
}).collect()
-
- println("---------------------")
- println("Run " + i)
+
+ println("---------------------")
+ println("Run " + i)
resultStrs.foreach(println)
- println("---------------------")
+ println("---------------------")
})
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 2b41c403b2..9dc51e0d40 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -18,7 +18,7 @@
package org.apache.spark.network
import java.nio.ByteBuffer
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf}
private[spark] object ReceiverTest {
def main(args: Array[String]) {
diff --git a/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
index 0d9f743b36..a1dfc4094c 100644
--- a/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
@@ -26,33 +26,33 @@ import org.apache.spark._
import org.apache.spark.network._
/**
- * SecurityMessage is class that contains the connectionId and sasl token
+ * SecurityMessage is class that contains the connectionId and sasl token
* used in SASL negotiation. SecurityMessage has routines for converting
* it to and from a BufferMessage so that it can be sent by the ConnectionManager
* and easily consumed by users when received.
* The api was modeled after BlockMessage.
*
- * The connectionId is the connectionId of the client side. Since
+ * The connectionId is the connectionId of the client side. Since
* message passing is asynchronous and its possible for the server side (receiving)
- * to get multiple different types of messages on the same connection the connectionId
- * is used to know which connnection the security message is intended for.
- *
+ * to get multiple different types of messages on the same connection the connectionId
+ * is used to know which connnection the security message is intended for.
+ *
* For instance, lets say we are node_0. We need to send data to node_1. The node_0 side
* is acting as a client and connecting to node_1. SASL negotiation has to occur
- * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message.
- * node_1 receives the message from node_0 but before it can process it and send a response,
- * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0
- * and sends a security message of its own to authenticate as a client. Now node_0 gets
- * the message and it needs to decide if this message is in response to it being a client
- * (from the first send) or if its just node_1 trying to connect to it to send data. This
+ * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message.
+ * node_1 receives the message from node_0 but before it can process it and send a response,
+ * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0
+ * and sends a security message of its own to authenticate as a client. Now node_0 gets
+ * the message and it needs to decide if this message is in response to it being a client
+ * (from the first send) or if its just node_1 trying to connect to it to send data. This
* is where the connectionId field is used. node_0 can lookup the connectionId to see if
* it is in response to it being a client or if its in response to someone sending other data.
- *
+ *
* The format of a SecurityMessage as its sent is:
* - Length of the ConnectionId
- * - ConnectionId
+ * - ConnectionId
* - Length of the token
- * - Token
+ * - Token
*/
private[spark] class SecurityMessage() extends Logging {
@@ -61,13 +61,13 @@ private[spark] class SecurityMessage() extends Logging {
def set(byteArr: Array[Byte], newconnectionId: String) {
if (byteArr == null) {
- token = new Array[Byte](0)
+ token = new Array[Byte](0)
} else {
token = byteArr
}
connectionId = newconnectionId
}
-
+
/**
* Read the given buffer and set the members of this class.
*/
@@ -91,17 +91,17 @@ private[spark] class SecurityMessage() extends Logging {
buffer.clear()
set(buffer)
}
-
+
def getConnectionId: String = {
return connectionId
}
-
+
def getToken: Array[Byte] = {
return token
}
-
+
/**
- * Create a BufferMessage that can be sent by the ConnectionManager containing
+ * Create a BufferMessage that can be sent by the ConnectionManager containing
* the security information from this class.
* @return BufferMessage
*/
@@ -110,12 +110,12 @@ private[spark] class SecurityMessage() extends Logging {
val buffers = new ArrayBuffer[ByteBuffer]()
// 4 bytes for the length of the connectionId
- // connectionId is of type char so multiple the length by 2 to get number of bytes
+ // connectionId is of type char so multiple the length by 2 to get number of bytes
// 4 bytes for the length of token
// token is a byte buffer so just take the length
var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length)
buffer.putInt(connectionId.length())
- connectionId.foreach((x: Char) => buffer.putChar(x))
+ connectionId.foreach((x: Char) => buffer.putChar(x))
buffer.putInt(token.length)
if (token.length > 0) {
@@ -123,7 +123,7 @@ private[spark] class SecurityMessage() extends Logging {
}
buffer.flip()
buffers += buffer
-
+
var message = Message.createBufferMessage(buffers)
logDebug("message total size is : " + message.size)
message.isSecurityNeg = true
@@ -136,7 +136,7 @@ private[spark] class SecurityMessage() extends Logging {
}
private[spark] object SecurityMessage {
-
+
/**
* Convert the given BufferMessage to a SecurityMessage by parsing the contents
* of the BufferMessage and populating the SecurityMessage fields.
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index 4164e81d3a..136c191204 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -36,8 +36,8 @@ private[spark] class FileHeader (
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
- throw new Exception("too long header " + buf.readableBytes)
- logInfo("too long header")
+ throw new Exception("too long header " + buf.readableBytes)
+ logInfo("too long header")
}
buf
}
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
index eade07fbcb..cadd0c7ed1 100644
--- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -44,7 +44,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
}
}
- /**
+ /**
* Set a handler to be called when this PartialResult completes. Only one completion handler
* is supported per PartialResult.
*/
@@ -60,7 +60,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
return this
}
- /**
+ /**
* Set a handler to be called if this PartialResult's job fails. Only one failure handler
* is supported per PartialResult.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 2306c9736b..9ca971c8a4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -52,7 +52,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
/** Compute the standard deviation of this RDD's elements. */
def stdev(): Double = stats().stdev
- /**
+ /**
* Compute the sample standard deviation of this RDD's elements (which corrects for bias in
* estimating the standard deviation by dividing by N-1 instead of N).
*/
@@ -123,13 +123,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* e.g. for the array
* [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50]
* e.g 1<=x<10 , 10<=x<20, 20<=x<50
- * And on the input of 1 and 50 we would have a histogram of 1, 0, 0
- *
+ * And on the input of 1 and 50 we would have a histogram of 1, 0, 0
+ *
* Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
* from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
* to true.
* buckets must be sorted and not contain any duplicates.
- * buckets array must be at least two elements
+ * buckets array must be at least two elements
* All NaN entries are treated the same. If you have a NaN bucket it must be
* the maximum value of the last position and all NaN entries will be counted
* in that bucket.
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index a84357b384..0c2cd7a247 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -33,7 +33,7 @@ class PartitionerAwareUnionRDDPartition(
val idx: Int
) extends Partition {
var parents = rdds.map(_.partitions(idx)).toArray
-
+
override val index = idx
override def hashCode(): Int = idx
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 04c53d4684..293cfb6564 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -54,7 +54,7 @@ private[scheduler]
case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler]
-case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler] case class CompletionEvent(
task: Task[_],
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 76f3e327d6..545fa453b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.spark.Logging
-
-/**
- * Asynchronously passes SparkListenerEvents to registered SparkListeners.
- *
- * Until start() is called, all posted events are only buffered. Only after this listener bus
- * has started will events be actually propagated to all attached listeners. This listener bus
- * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
- */
-private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
- private var started = false
- private val listenerThread = new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
- }
- postToAll(event)
- }
- }
- }
-
- // Exposed for testing
- @volatile private[spark] var stopCalled = false
-
- /**
- * Start sending events to attached listeners.
- *
- * This first sends out all buffered events posted before this listener bus has started, then
- * listens for any additional events asynchronously while the listener bus is still running.
- * This should only be called once.
- */
- def start() {
- if (started) {
- throw new IllegalStateException("Listener bus already started!")
- }
- listenerThread.start()
- started = true
- }
-
- def post(event: SparkListenerEvent) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
- logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
- "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
- "rate at which tasks are being started by the scheduler.")
- queueFullErrorMessageLogged = true
- }
- }
-
- /**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- * add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- def stop() {
- stopCalled = true
- if (!started) {
- throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
- }
- post(SparkListenerShutdown)
- listenerThread.join()
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import org.apache.spark.Logging
+
+/**
+ * Asynchronously passes SparkListenerEvents to registered SparkListeners.
+ *
+ * Until start() is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
+ */
+private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
+ private var started = false
+ private val listenerThread = new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ postToAll(event)
+ }
+ }
+ }
+
+ // Exposed for testing
+ @volatile private[spark] var stopCalled = false
+
+ /**
+ * Start sending events to attached listeners.
+ *
+ * This first sends out all buffered events posted before this listener bus has started, then
+ * listens for any additional events asynchronously while the listener bus is still running.
+ * This should only be called once.
+ */
+ def start() {
+ if (started) {
+ throw new IllegalStateException("Listener bus already started!")
+ }
+ listenerThread.start()
+ started = true
+ }
+
+ def post(event: SparkListenerEvent) {
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded && !queueFullErrorMessageLogged) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
+ }
+ }
+
+ /**
+ * Waits until there are no more events in the queue, or until the specified time has elapsed.
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ true
+ }
+
+ def stop() {
+ stopCalled = true
+ if (!started) {
+ throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
+ }
+ post(SparkListenerShutdown)
+ listenerThread.join()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 2fbbda5b76..ace9cd51c9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -240,7 +240,7 @@ object BlockFetcherIterator {
override def numRemoteBlocks: Int = numRemote
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
-
+
// Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
// as they arrive.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a2a7291300..df9bb4044e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -388,7 +388,7 @@ private[spark] class BlockManager(
logDebug("Block " + blockId + " not found in memory")
}
}
-
+
// Look for the block in Tachyon
if (level.useOffHeap) {
logDebug("Getting block " + blockId + " from tachyon")
@@ -1031,7 +1031,7 @@ private[spark] class BlockManager(
memoryStore.clear()
diskStore.clear()
if (tachyonInitialized) {
- tachyonStore.clear()
+ tachyonStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 7168ae18c2..337b45b727 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -37,7 +37,7 @@ private[spark] class BlockMessage() {
private var id: BlockId = null
private var data: ByteBuffer = null
private var level: StorageLevel = null
-
+
def set(getBlock: GetBlock) {
typ = BlockMessage.TYPE_GET_BLOCK
id = getBlock.id
@@ -75,13 +75,13 @@ private[spark] class BlockMessage() {
idBuilder += buffer.getChar()
}
id = BlockId(idBuilder.toString)
-
+
if (typ == BlockMessage.TYPE_PUT_BLOCK) {
val booleanInt = buffer.getInt()
val replication = buffer.getInt()
level = StorageLevel(booleanInt, replication)
-
+
val dataLength = buffer.getInt()
data = ByteBuffer.allocate(dataLength)
if (dataLength != buffer.remaining) {
@@ -108,12 +108,12 @@ private[spark] class BlockMessage() {
buffer.clear()
set(buffer)
}
-
+
def getType: Int = typ
def getId: BlockId = id
def getData: ByteBuffer = data
def getLevel: StorageLevel = level
-
+
def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()
@@ -127,7 +127,7 @@ private[spark] class BlockMessage() {
buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
buffer.flip()
buffers += buffer
-
+
buffer = ByteBuffer.allocate(4).putInt(data.remaining)
buffer.flip()
buffers += buffer
@@ -140,7 +140,7 @@ private[spark] class BlockMessage() {
buffers += data
}
-
+
/*
println()
println("BlockMessage: ")
@@ -158,7 +158,7 @@ private[spark] class BlockMessage() {
}
override def toString: String = {
- "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
+ "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
", data = " + (if (data != null) data.remaining.toString else "null") + "]"
}
}
@@ -168,7 +168,7 @@ private[spark] object BlockMessage {
val TYPE_GET_BLOCK: Int = 1
val TYPE_GOT_BLOCK: Int = 2
val TYPE_PUT_BLOCK: Int = 3
-
+
def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = {
val newBlockMessage = new BlockMessage()
newBlockMessage.set(bufferMessage)
@@ -192,7 +192,7 @@ private[spark] object BlockMessage {
newBlockMessage.set(gotBlock)
newBlockMessage
}
-
+
def fromPutBlock(putBlock: PutBlock): BlockMessage = {
val newBlockMessage = new BlockMessage()
newBlockMessage.set(putBlock)
@@ -206,7 +206,7 @@ private[spark] object BlockMessage {
val bMsg = B.toBufferMessage
val C = new BlockMessage()
C.set(bMsg)
-
+
println(B.getId + " " + B.getLevel)
println(C.getId + " " + C.getLevel)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index dc62b1efaa..973d85c0a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -27,16 +27,16 @@ import org.apache.spark.network._
private[spark]
class BlockMessageArray(var blockMessages: Seq[BlockMessage])
extends Seq[BlockMessage] with Logging {
-
+
def this(bm: BlockMessage) = this(Array(bm))
def this() = this(null.asInstanceOf[Seq[BlockMessage]])
- def apply(i: Int) = blockMessages(i)
+ def apply(i: Int) = blockMessages(i)
def iterator = blockMessages.iterator
- def length = blockMessages.length
+ def length = blockMessages.length
def set(bufferMessage: BufferMessage) {
val startTime = System.currentTimeMillis
@@ -62,15 +62,15 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage])
logDebug("Trying to convert buffer " + newBuffer + " to block message")
val newBlockMessage = BlockMessage.fromByteBuffer(newBuffer)
logDebug("Created " + newBlockMessage)
- newBlockMessages += newBlockMessage
+ newBlockMessages += newBlockMessage
buffer.position(buffer.position() + size)
}
val finishTime = System.currentTimeMillis
logDebug("Converted block message array from buffer message in " +
(finishTime - startTime) / 1000.0 + " s")
- this.blockMessages = newBlockMessages
+ this.blockMessages = newBlockMessages
}
-
+
def toBufferMessage: BufferMessage = {
val buffers = new ArrayBuffer[ByteBuffer]()
@@ -83,7 +83,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage])
buffers ++= bufferMessage.buffers
logDebug("Added " + bufferMessage)
})
-
+
logDebug("Buffer list:")
buffers.foreach((x: ByteBuffer) => logDebug("" + x))
/*
@@ -103,13 +103,13 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage])
}
private[spark] object BlockMessageArray {
-
+
def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = {
val newBlockMessageArray = new BlockMessageArray()
newBlockMessageArray.set(bufferMessage)
newBlockMessageArray
}
-
+
def main(args: Array[String]) {
val blockMessages =
(0 until 10).map { i =>
@@ -124,10 +124,10 @@ private[spark] object BlockMessageArray {
}
val blockMessageArray = new BlockMessageArray(blockMessages)
println("Block message array created")
-
+
val bufferMessage = blockMessageArray.toBufferMessage
println("Converted to buffer message")
-
+
val totalSize = bufferMessage.size
val newBuffer = ByteBuffer.allocate(totalSize)
newBuffer.clear()
@@ -137,7 +137,7 @@ private[spark] object BlockMessageArray {
buffer.rewind()
})
newBuffer.flip
- val newBufferMessage = Message.createBufferMessage(newBuffer)
+ val newBufferMessage = Message.createBufferMessage(newBuffer)
println("Copied to new buffer message, size = " + newBufferMessage.size)
val newBlockMessageArray = BlockMessageArray.fromBufferMessage(newBufferMessage)
@@ -147,7 +147,7 @@ private[spark] object BlockMessageArray {
case BlockMessage.TYPE_PUT_BLOCK => {
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
println(pB)
- }
+ }
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
println(gB)
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index e1a1f209c9..9ce0398d01 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -136,7 +136,7 @@ private[spark] object JettyUtils extends Logging {
private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
filters.foreach {
- case filter : String =>
+ case filter : String =>
if (!filter.isEmpty) {
logInfo("Adding filter: " + filter)
val holder : FilterHolder = new FilterHolder()
@@ -151,7 +151,7 @@ private[spark] object JettyUtils extends Logging {
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
- val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
+ val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a487924eff..a7cf04b3cb 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,7 +48,7 @@ private[spark] object UIUtils {
case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
}
val environment = page match {
- case Environment =>
+ case Environment =>
<li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
}
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index cdbbc65292..2d05e09b10 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -45,7 +45,7 @@ private[spark] object ClosureCleaner extends Logging {
private def isClosure(cls: Class[_]): Boolean = {
cls.getName.contains("$anonfun$")
}
-
+
// Get a list of the classes of the outer objects of a given closure object, obj;
// the outer objects are defined as any closures that obj is nested within, plus
// possibly the class that the outermost closure is in, if any. We stop searching
@@ -63,7 +63,7 @@ private[spark] object ClosureCleaner extends Logging {
}
Nil
}
-
+
// Get a list of the outer objects for a given closure object.
private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
@@ -76,7 +76,7 @@ private[spark] object ClosureCleaner extends Logging {
}
Nil
}
-
+
private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
val seen = Set[Class[_]](obj.getClass)
var stack = List[Class[_]](obj.getClass)
@@ -92,7 +92,7 @@ private[spark] object ClosureCleaner extends Logging {
}
return (seen - obj.getClass).toList
}
-
+
private def createNullValue(cls: Class[_]): AnyRef = {
if (cls.isPrimitive) {
new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
@@ -100,13 +100,13 @@ private[spark] object ClosureCleaner extends Logging {
null
}
}
-
+
def clean(func: AnyRef) {
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
val outerObjects = getOuterObjects(func)
-
+
val accessedFields = Map[Class[_], Set[String]]()
for (cls <- outerClasses)
accessedFields(cls) = Set[String]()
@@ -143,7 +143,7 @@ private[spark] object ClosureCleaner extends Logging {
field.set(outer, value)
}
}
-
+
if (outer != null) {
// logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
@@ -151,7 +151,7 @@ private[spark] object ClosureCleaner extends Logging {
field.set(func, outer)
}
}
-
+
private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
// logInfo("Creating a " + cls + " with outer = " + outer)
if (!inInterpreter) {
@@ -192,7 +192,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor
}
}
}
-
+
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
// Check for calls a getter method for a variable in an interpreter wrapper object.
@@ -209,12 +209,12 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor
private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
var myName: String = null
-
+
override def visit(version: Int, access: Int, name: String, sig: String,
superName: String, interfaces: Array[String]) {
myName = name
}
-
+
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index d990fd49ef..f2396f7c80 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -611,7 +611,7 @@ private[spark] object JsonProtocol {
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
- rddInfo.tachyonSize = tachyonSize
+ rddInfo.tachyonSize = tachyonSize
rddInfo.diskSize = diskSize
rddInfo
}
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
index 8266e5e495..e5c732a5a5 100644
--- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
/** Provides a basic/boilerplate Iterator implementation. */
private[spark] abstract class NextIterator[U] extends Iterator[U] {
-
+
private var gotNext = false
private var nextValue: U = _
private var closed = false
@@ -34,7 +34,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
- *
+ *
* @return U, or set 'finished' when done
*/
protected def getNext(): U
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 732748a7ff..d80eed455c 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -62,10 +62,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (n == 0) {
mu = other.mu
m2 = other.m2
- n = other.n
+ n = other.n
maxValue = other.maxValue
minValue = other.minValue
- } else if (other.n != 0) {
+ } else if (other.n != 0) {
val delta = other.mu - mu
if (other.n * 10 < n) {
mu = mu + (delta * other.n) / (n + other.n)
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 3c8f94a416..1a647fa1c9 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -136,7 +136,7 @@ object Vector {
def ones(length: Int) = Vector(length, _ => 1)
/**
- * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
+ * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
index 8a4cdea2fa..7f220383f9 100644
--- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
@@ -25,28 +25,28 @@ import scala.util.hashing.MurmurHash3
import org.apache.spark.util.Utils.timeIt
/**
- * This class implements a XORShift random number generator algorithm
+ * This class implements a XORShift random number generator algorithm
* Source:
* Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14.
* @see <a href="http://www.jstatsoft.org/v08/i14/paper">Paper</a>
* This implementation is approximately 3.5 times faster than
* {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due
- * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class
+ * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class
* uses a regular Long. We can forgo thread safety since we use a new instance of the RNG
* for each thread.
*/
private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) {
-
+
def this() = this(System.nanoTime)
private var seed = XORShiftRandom.hashSeed(init)
// we need to just override next - this will be called by nextInt, nextDouble,
// nextGaussian, nextLong, etc.
- override protected def next(bits: Int): Int = {
+ override protected def next(bits: Int): Int = {
var nextSeed = seed ^ (seed << 21)
nextSeed ^= (nextSeed >>> 35)
- nextSeed ^= (nextSeed << 4)
+ nextSeed ^= (nextSeed << 4)
seed = nextSeed
(nextSeed & ((1L << bits) -1)).asInstanceOf[Int]
}
@@ -89,7 +89,7 @@ private[spark] object XORShiftRandom {
val million = 1e6.toInt
val javaRand = new JavaRandom(seed)
val xorRand = new XORShiftRandom(seed)
-
+
// this is just to warm up the JIT - we're not timing anything
timeIt(1e6.toInt) {
javaRand.nextInt()
@@ -97,9 +97,9 @@ private[spark] object XORShiftRandom {
}
val iters = timeIt(numIters)(_)
-
+
/* Return results as a map instead of just printing to screen
- in case the user wants to do something with them */
+ in case the user wants to do something with them */
Map("javaTime" -> iters {javaRand.nextInt()},
"xorTime" -> iters {xorRand.nextInt()})
diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
index c5f24c66ce..c645e4cbe8 100644
--- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
@@ -37,7 +37,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -54,14 +54,14 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
assert(securityManagerBad.isAuthenticationEnabled() === true)
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = conf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ intercept[akka.actor.ActorNotFound] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}
actorSystem.shutdown()
@@ -75,7 +75,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -91,7 +91,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
badconf.set("spark.authenticate.secret", "good")
val securityManagerBad = new SecurityManager(badconf);
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = badconf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
@@ -127,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -180,7 +180,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -204,8 +204,8 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ intercept[akka.actor.ActorNotFound] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}
actorSystem.shutdown()
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 9cbdfc54a3..7f59bdcce4 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -39,7 +39,7 @@ class DriverSuite extends FunSuite with Timeouts {
failAfter(60 seconds) {
Utils.executeAndGetOutput(
Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
- new File(sparkHome),
+ new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index aee9ab9091..d651fbbac4 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -45,7 +45,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val pw = new PrintWriter(textFile)
pw.println("100")
pw.close()
-
+
val jarFile = new File(tmpDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
@@ -53,7 +53,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
-
+
val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 01af940771..b9b668d3cc 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -106,7 +106,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
- val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
nums.saveAsSequenceFile(outputDir)
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
index 0b5ed6d770..5e538d6fab 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -45,4 +45,4 @@ class WorkerWatcherSuite extends FunSuite {
actorRef.underlyingActor.receive(new DisassociatedEvent(null, otherAkkaAddress, false))
assert(!actorRef.underlyingActor.isShutDown)
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 09e35bfc8f..e89b296d41 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -42,7 +42,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
override def beforeAll() {
sc = new SparkContext("local", "test")
-
+
// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index a4381a8b97..4df36558b6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -34,14 +34,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === "2")
assert(slices(2).mkString(",") === "3")
}
-
+
test("one slice") {
val data = Array(1, 2, 3)
val slices = ParallelCollectionRDD.slice(data, 1)
assert(slices.size === 1)
assert(slices(0).mkString(",") === "1,2,3")
}
-
+
test("equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -50,7 +50,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === "4,5,6")
assert(slices(2).mkString(",") === "7,8,9")
}
-
+
test("non-equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -77,14 +77,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
}
-
+
test("empty data") {
val data = new Array[Int](0)
val slices = ParallelCollectionRDD.slice(data, 5)
assert(slices.size === 5)
for (slice <- slices) assert(slice.size === 0)
}
-
+
test("zero slices") {
val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
@@ -94,7 +94,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
}
-
+
test("exclusive ranges sliced into ranges") {
val data = 1 until 100
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -102,7 +102,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[Range]))
}
-
+
test("inclusive ranges sliced into ranges") {
val data = 1 to 100
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -124,7 +124,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(range.step === 1, "slice " + i + " step")
}
}
-
+
test("random array tests") {
val gen = for {
d <- arbitrary[List[Int]]
@@ -141,7 +141,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
}
check(prop)
}
-
+
test("random exclusive range tests") {
val gen = for {
a <- Gen.choose(-100, 100)
@@ -177,7 +177,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
}
check(prop)
}
-
+
test("exclusive ranges of longs") {
val data = 1L until 100L
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -185,7 +185,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("inclusive ranges of longs") {
val data = 1L to 100L
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -193,7 +193,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index dc704e07a8..4cdccdda6f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -216,7 +216,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("onTaskGettingResult() called when result fetched remotely") {
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
-
+
// Make a task whose result is larger than the akka frame size
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
@@ -236,7 +236,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("onTaskGettingResult() not called when result sent directly") {
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
-
+
// Make a task whose result is larger than the akka frame size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 356e28dd19..2fb750d9ee 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -264,7 +264,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
test("Scheduler does not always schedule tasks on the same workers") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
+ val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 45c3224279..2f9739f940 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -33,8 +33,8 @@ class UISuite extends FunSuite {
val server = new Server(startPort)
Try { server.start() } match {
- case Success(s) =>
- case Failure(e) =>
+ case Success(s) =>
+ case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val serverInfo1 = JettyUtils.startJettyServer(
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 439e5644e2..d7e48e633e 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -69,7 +69,7 @@ object TestObject {
class TestClass extends Serializable {
var x = 5
-
+
def getX = x
def run(): Int = {
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
index e1446cbc90..32d74d0500 100644
--- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -32,7 +32,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers {
i.hasNext should be === false
intercept[NoSuchElementException] { i.next() }
}
-
+
test("two iterations") {
val i = new StubIterator(Buffer(1, 2))
i.hasNext should be === true
@@ -70,7 +70,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers {
class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
var closeCalled = 0
-
+
override def getNext() = {
if (ints.size == 0) {
finished = true
diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
index 757476efdb..39199a1a17 100644
--- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
@@ -29,12 +29,12 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
val xorRand = new XORShiftRandom(seed)
val hundMil = 1e8.toInt
}
-
+
/*
- * This test is based on a chi-squared test for randomness. The values are hard-coded
+ * This test is based on a chi-squared test for randomness. The values are hard-coded
* so as not to create Spark's dependency on apache.commons.math3 just to call one
* method for calculating the exact p-value for a given number of random numbers
- * and bins. In case one would want to move to a full-fledged test based on
+ * and bins. In case one would want to move to a full-fledged test based on
* apache.commons.math3, the relevant class is here:
* org.apache.commons.math3.stat.inference.ChiSquareTest
*/
@@ -49,19 +49,19 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
// populate bins based on modulus of the random number
times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1}
- /* since the seed is deterministic, until the algorithm is changed, we know the result will be
- * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
- * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%)
- * significance level. However, should the RNG implementation change, the test should still
- * pass at the same significance level. The chi-squared test done in R gave the following
+ /* since the seed is deterministic, until the algorithm is changed, we know the result will be
+ * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
+ * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%)
+ * significance level. However, should the RNG implementation change, the test should still
+ * pass at the same significance level. The chi-squared test done in R gave the following
* results:
* > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
* 10000790, 10002286, 9998699))
* Chi-squared test for given probabilities
- * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790,
+ * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790,
* 10002286, 9998699)
* X-squared = 11.975, df = 9, p-value = 0.2147
- * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million
+ * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million
* random numbers
* and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared
* is greater than or equal to that number.
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 41e813d48c..1204cfba39 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._
* @param storageLevel RDD storage level.
*/
-private[streaming]
+private[streaming]
class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
-
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
+
def onStop() {
blockGenerator.stop()
}
-
+
def onStart() {
blockGenerator.start()
- // Set up persistence for messages
+ // Set up persistence for messages
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
- // Connect to MqttBroker
+ // Connect to MqttBroker
client.connect()
// Subscribe to Mqtt topic
@@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String,
// Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
- // Handles Mqtt message
+ // Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 3316b6dc39..843a4a7a9a 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
-*
+*
* If no Authorization object is provided, initializes OAuth authorization using the system
* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
*/
@@ -42,13 +42,13 @@ class TwitterInputDStream(
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
-
+
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
+
override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 377d9d6bd5..5635287694 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -172,7 +172,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
"EdgeDirection.Either instead.")
}
}
-
+
/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 6386306c04..a467ca1ae7 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -55,7 +55,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
}
}
}
-
+
test ("filter") {
withSpark { sc =>
val n = 5
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index e41d9bbe18..7f6d94571b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vector
trait Optimizer extends Serializable {
/**
- * Solve the provided convex optimization problem.
+ * Solve the provided convex optimization problem.
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 3bd0017aa1..d969e7aa60 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -26,7 +26,7 @@ import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
- * GeneralizedLinearModel (GLM) represents a model trained using
+ * GeneralizedLinearModel (GLM) represents a model trained using
* GeneralizedLinearAlgorithm. GLMs consist of a weight vector and
* an intercept.
*
@@ -38,7 +38,7 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double
/**
* Predict the result given a data point and the weights learned.
- *
+ *
* @param dataMatrix Row vector containing the features for this data point
* @param weightMatrix Column vector containing the weights of the model
* @param intercept Intercept of the model.
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index a30dcfdcec..687e85ca94 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -35,7 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used.
* Allows the user to specify if user class path should be first
- */
+ */
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader {
val uri = new URI(classUri)
@@ -94,7 +94,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader,
case e: Exception => None
}
}
-
+
def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
if (name.startsWith("line") && name.endsWith("$iw$")) {
// Class seems to be an interpreter "wrapper" object storing a val or var.
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 8f61a5e835..419796b68b 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -187,7 +187,7 @@ trait SparkImports {
if (currentImps contains imv) addWrapper()
val objName = req.lineRep.readPath
val valName = "$VAL" + newValId();
-
+
if(!code.toString.endsWith(".`" + imv + "`;\n")) { // Which means already imported
code.append("val " + valName + " = " + objName + ".INSTANCE;\n")
code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 17118499d0..1f3fab09e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -28,7 +28,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
override def toString = s"CAST($child, $dataType)"
type EvaluatedType = Any
-
+
def nullOrCast[T](a: Any, func: T => Any): Any = if(a == null) {
null
} else {
@@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case BinaryType => nullOrCast[Array[Byte]](_, new String(_, "UTF-8"))
case _ => nullOrCast[Any](_, _.toString)
}
-
+
// BinaryConverter
def castToBinary: Any => Any = child.dataType match {
case StringType => nullOrCast[String](_, _.getBytes("UTF-8"))
@@ -58,7 +58,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case DoubleType => nullOrCast[Double](_, _ != 0)
case FloatType => nullOrCast[Float](_, _ != 0)
}
-
+
// TimestampConverter
def castToTimestamp: Any => Any = child.dataType match {
case StringType => nullOrCast[String](_, s => {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 8a1db8e796..dd9332ada8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -86,7 +86,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed
+ * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed
* to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -120,7 +120,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Fractional children expressions. Those expressions are
+ * Evaluation helper function for 2 Fractional children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -153,7 +153,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Integral children expressions. Those expressions are
+ * Evaluation helper function for 2 Integral children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -186,12 +186,12 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Comparable children expressions. Those expressions are
+ * Evaluation helper function for 2 Comparable children expressions. Those expressions are
* supposed to be in the same data type, and the return type should be Integer:
* Negative value: 1st argument less than 2nd argument
* Zero: 1st argument equals 2nd argument
* Positive value: 1st argument greater than 2nd argument
- *
+ *
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
@@ -213,7 +213,7 @@ abstract class Expression extends TreeNode[Expression] {
null
} else {
e1.dataType match {
- case i: NativeType =>
+ case i: NativeType =>
f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) => Boolean](
i.ordering, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
case other => sys.error(s"Type $other does not support ordered operations")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index a27c71db1b..ddc16ce87b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -28,19 +28,19 @@ trait StringRegexExpression {
self: BinaryExpression =>
type EvaluatedType = Any
-
+
def escape(v: String): String
def matches(regex: Pattern, str: String): Boolean
-
+
def nullable: Boolean = true
def dataType: DataType = BooleanType
-
- // try cache the pattern for Literal
+
+ // try cache the pattern for Literal
private lazy val cache: Pattern = right match {
case x @ Literal(value: String, StringType) => compile(value)
case _ => null
}
-
+
protected def compile(str: String): Pattern = if(str == null) {
null
} else {
@@ -49,7 +49,7 @@ trait StringRegexExpression {
}
protected def pattern(str: String) = if(cache == null) compile(str) else cache
-
+
override def eval(input: Row): Any = {
val l = left.eval(input)
if (l == null) {
@@ -73,11 +73,11 @@ trait StringRegexExpression {
/**
* Simple RegEx pattern matching function
*/
-case class Like(left: Expression, right: Expression)
+case class Like(left: Expression, right: Expression)
extends BinaryExpression with StringRegexExpression {
-
+
def symbol = "LIKE"
-
+
// replace the _ with .{1} exactly match 1 time of any character
// replace the % with .*, match 0 or more times with any character
override def escape(v: String) = {
@@ -98,19 +98,19 @@ case class Like(left: Expression, right: Expression)
sb.append(Pattern.quote(Character.toString(n)));
}
}
-
+
i += 1
}
-
+
sb.toString()
}
-
+
override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches()
}
-case class RLike(left: Expression, right: Expression)
+case class RLike(left: Expression, right: Expression)
extends BinaryExpression with StringRegexExpression {
-
+
def symbol = "RLIKE"
override def escape(v: String): String = v
override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index cdeb01a965..da34bd3a21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -55,9 +55,9 @@ case object BooleanType extends NativeType {
case object TimestampType extends NativeType {
type JvmType = Timestamp
-
+
@transient lazy val tag = typeTag[JvmType]
-
+
val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 888a19d79f..2cd0d2b0e1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -144,7 +144,7 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation("abc" like "b%", false)
checkEvaluation("abc" like "bc%", false)
}
-
+
test("LIKE Non-literal Regular Expression") {
val regEx = 'a.string.at(0)
checkEvaluation("abcd" like regEx, null, new GenericRow(Array[Any](null)))
@@ -164,7 +164,7 @@ class ExpressionEvaluationSuite extends FunSuite {
test("RLIKE literal Regular Expression") {
checkEvaluation("abdef" rlike "abdef", true)
checkEvaluation("abbbbc" rlike "a.*c", true)
-
+
checkEvaluation("fofo" rlike "^fo", true)
checkEvaluation("fo\no" rlike "^fo\no$", true)
checkEvaluation("Bn" rlike "^Ba*n", true)
@@ -196,9 +196,9 @@ class ExpressionEvaluationSuite extends FunSuite {
evaluate("abbbbc" rlike regEx, new GenericRow(Array[Any]("**")))
}
}
-
+
test("data type casting") {
-
+
val sts = "1970-01-01 00:00:01.0"
val ts = Timestamp.valueOf(sts)
@@ -236,7 +236,7 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation("23" cast ShortType, 23)
checkEvaluation("2012-12-11" cast DoubleType, null)
checkEvaluation(Literal(123) cast IntegerType, 123)
-
+
intercept[Exception] {evaluate(Literal(1) cast BinaryType, null)}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index 65eae3357a..1cbf973c34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -56,4 +56,4 @@ class ScalaReflectionRelationSuite extends FunSuite {
val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
assert(result.toSeq === Seq[Byte](1))
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 93023e8dce..ac56ff709c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
-private[streaming]
+private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
@@ -79,7 +79,7 @@ object Checkpoint extends Logging {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
- (time1 < time2) || (time1 == time2 && bk1)
+ (time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
@@ -95,7 +95,7 @@ object Checkpoint extends Logging {
}
} else {
logInfo("Checkpoint directory " + path + " does not exist")
- Seq.empty
+ Seq.empty
}
}
}
@@ -160,7 +160,7 @@ class CheckpointWriter(
})
}
- // All done, print success
+ // All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
@@ -227,14 +227,14 @@ object CheckpointReader extends Logging {
{
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
-
- // Try to find the checkpoint files
+
+ // Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
- // Try to read the checkpoint files in the order
+ // Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
checkpointFiles.foreach(file => {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 16479a0127..ad4f3fdd14 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -20,11 +20,11 @@ package org.apache.spark.streaming
private[streaming]
class Interval(val beginTime: Time, val endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
-
+
def duration(): Duration = endTime - beginTime
def + (time: Duration): Interval = {
- new Interval(beginTime + time, endTime + time)
+ new Interval(beginTime + time, endTime + time)
}
def - (time: Duration): Interval = {
@@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) {
}
def <= (that: Interval) = (this < that || this == that)
-
+
def > (that: Interval) = !(this <= that)
-
+
def >= (that: Interval) = !(this < that)
override def toString = "[" + beginTime + ", " + endTime + "]"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 2678334f53..6a6b00a778 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -32,7 +32,7 @@ case class Time(private val millis: Long) {
def <= (that: Time): Boolean = (this.millis <= that.millis)
def > (that: Time): Boolean = (this.millis > that.millis)
-
+
def >= (that: Time): Boolean = (this.millis >= that.millis)
def + (that: Duration): Time = new Time(millis + that.milliseconds)
@@ -43,7 +43,7 @@ case class Time(private val millis: Long) {
def floor(that: Duration): Time = {
val t = that.milliseconds
- val m = math.floor(this.millis / t).toLong
+ val m = math.floor(this.millis / t).toLong
new Time(m * t)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 903e3f3c9b..f33c0ceafd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
.map(x => (x._1, x._2.getCheckpointFile.get))
logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
- // Add the checkpoint files to the data to be serialized
+ // Add the checkpoint files to the data to be serialized
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8a6051622e..e878285f6a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
logDebug("Accepted " + path)
} catch {
- case fnfe: java.io.FileNotFoundException =>
+ case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 97325f8ea3..6376cff78b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag](
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {
-
+
override def start() { }
-
+
override def stop() { }
-
+
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
@@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag](
None
}
}
-
+
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 44eb2750c6..f5984d03c5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy {
* the API for pushing received data into Spark Streaming for being processed.
*
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
+ *
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index c5ef2cc8c3..39145a3ab0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -19,34 +19,34 @@ package org.apache.spark.streaming.util
private[streaming]
trait Clock {
- def currentTime(): Long
+ def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
private[streaming]
class SystemClock() extends Clock {
-
+
val minPollTime = 25L
-
+
def currentTime(): Long = {
System.currentTimeMillis()
- }
-
+ }
+
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()
-
+
var waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
-
+
val pollTime = {
if (waitTime / 10.0 > minPollTime) {
(waitTime / 10.0).toLong
} else {
- minPollTime
- }
+ minPollTime
+ }
}
while (true) {
@@ -55,7 +55,7 @@ class SystemClock() extends Clock {
if (waitTime <= 0) {
return currentTime
}
- val sleepTime =
+ val sleepTime =
if (waitTime < pollTime) {
waitTime
} else {
@@ -69,7 +69,7 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
-
+
var time = 0L
def currentTime() = time
@@ -85,13 +85,13 @@ class ManualClock() extends Clock {
this.synchronized {
time += timeToAdd
this.notifyAll()
- }
+ }
}
def waitTillTime(targetTime: Long): Long = {
this.synchronized {
while (time < targetTime) {
this.wait(100)
- }
+ }
}
currentTime()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 07021ebb58..bd1df55cf7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
* (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
@@ -55,13 +55,13 @@ object RawTextHelper {
map.toIterator.map{case (k, v) => (k, v)}
}
- /**
+ /**
* Gets the top k words in terms of word counts. Assumes that each word exists only once
* in the `data` iterator (that is, the counts have been reduced).
*/
def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
val taken = new Array[(String, Long)](k)
-
+
var i = 0
var len = 0
var done = false
@@ -93,7 +93,7 @@ object RawTextHelper {
}
taken.toIterator
}
-
+
/**
* Warms up the SparkContext in master and slave by running tasks to force JIT kick in
* before real workload starts.
@@ -106,11 +106,11 @@ object RawTextHelper {
.count()
}
}
-
- def add(v1: Long, v2: Long) = (v1 + v2)
- def subtract(v1: Long, v2: Long) = (v1 - v2)
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
- def max(v1: Long, v2: Long) = math.max(v1, v2)
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index f71938ac55..e016377c94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -22,10 +22,10 @@ import org.apache.spark.Logging
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
-
+
private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
- override def run() { loop }
+ override def run() { loop }
}
@volatile private var prevTime = -1L
@@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
private[streaming]
object RecurringTimer {
-
+
def main(args: Array[String]) {
var lastRecurTime = 0L
val period = 1000
-
+
def onRecur(time: Long) {
val currentTime = System.currentTimeMillis()
println("" + currentTime + ": " + (currentTime - lastRecurTime))
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 13fa64894b..a0b1bbc34f 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
-
+
class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));