From 9c934b640f76b17097f2cae87fef30b05ce854b7 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Tue, 19 Nov 2013 10:19:03 -0800 Subject: Remove the semicolons at the end of Scala code to make it more pure Scala code. Also remove unused imports as I found them along the way. Remove return statements when returning value in the Scala code. Passing compile and tests. --- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 +- .../org/apache/spark/streaming/api/java/JavaStreamingContext.scala | 7 +++---- .../org/apache/spark/streaming/dstream/FlumeInputDStream.scala | 4 ++-- .../test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 4 ++-- .../src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 2 +- 5 files changed, 9 insertions(+), 10 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index bb9febad38..78a2c07204 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { fs.delete(file, false) fs.rename(writeFile, file) - val finishTime = System.currentTimeMillis(); + val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") return diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index cf30b541e1..7f9dab0ef9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong, Integer => JInt} +import java.lang.{Integer => JInt} import java.io.InputStream import java.util.{Map => JMap, List => JList} @@ -33,10 +33,9 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -311,7 +310,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] implicit val cmf: ClassManifest[F] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] - ssc.fileStream[K, V, F](directory); + ssc.fileStream[K, V, F](directory) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala index 18de772946..a0189eca04 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -137,8 +137,8 @@ class FlumeReceiver( protected override def onStart() { val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)); - val server = new NettyServer(responder, new InetSocketAddress(host, port)); + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) blockGenerator.start() server.start() logInfo("Flume receiver started") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a559db468a..7dc82decef 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver); + classOf[AvroSourceProtocol], transceiver) for (i <- 0 until input.size) { val event = new AvroFlumeEvent diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index be140699c2..8c8c359e6e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -251,7 +251,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { Thread.sleep(500) // Give some time for the forgetting old RDDs to complete } catch { - case e: Exception => e.printStackTrace(); throw e; + case e: Exception => {e.printStackTrace(); throw e} } finally { ssc.stop() } -- cgit v1.2.3 From 10be58f251b5e883295bd46383c0a9758555f8fc Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Tue, 19 Nov 2013 16:56:23 -0800 Subject: Another set of changes to remove unnecessary semicolon (;) from Scala code. Passed the sbt/sbt compile and test --- .../src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala | 2 +- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 +++- .../main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala | 4 ++-- 7 files changed, 10 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index 481ff8c3e0..b1e1576dad 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging { extends FileClientHandler with Logging { override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) { - logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)"); + logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)") resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen)) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c1c7aa70e6..fbd822867f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) { summary ++

Summary Metrics for {numCompleted} Completed Tasks

++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++ -

Tasks

++ taskTable; +

Tasks

++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 78a2c07204..9271914eb5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging { def stop() { synchronized { - if (stopped) return ; + if (stopped) { + return + } stopped = true } executor.shutdown() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3f6e151d89..997a6dc1ec 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -195,7 +195,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e successed = true } finally { logDebug("finishing main") - isLastAMRetry = true; + isLastAMRetry = true if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 038de30de5..49a8cfde81 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,7 +351,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 268ab950e8..5f159b073f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -201,7 +201,7 @@ class ClientDistributedCacheManager() extends Logging { val perms = status.getPermission() val otherAction = perms.getOtherAction() if (otherAction.implies(action)) { - return true; + return true } return false } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index fb966b4784..a4d6e1d87d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -107,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" @@ -215,7 +215,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } - }); + }) proxy } -- cgit v1.2.3