aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenry Saputra <hsaputra@apache.org>2014-02-02 21:51:17 -0800
committerReynold Xin <rxin@apache.org>2014-02-02 21:51:17 -0800
commit0386f42e383dc01b8df33c4a70b024e7902b5fdd (patch)
treef5f2152d661ca67dacb81788236548219264d3bb
parenta8cf3ec157fc9a512421b319cfffc5e4f07cf1f3 (diff)
downloadspark-0386f42e383dc01b8df33c4a70b024e7902b5fdd.tar.gz
spark-0386f42e383dc01b8df33c4a70b024e7902b5fdd.tar.bz2
spark-0386f42e383dc01b8df33c4a70b024e7902b5fdd.zip
Merge pull request #529 from hsaputra/cleanup_right_arrowop_scala
Change the ⇒ character (maybe from scalariform) to => in Scala code for style consistency Looks like there are some ⇒ Unicode character (maybe from scalariform) in Scala code. This PR is to change it to => to get some consistency on the Scala code. If we want to use ⇒ as default we could use sbt plugin scalariform to make sure all Scala code has ⇒ instead of => And remove unused imports found in TwitterInputDStream.scala while I was there =) Author: Henry Saputra <hsaputra@apache.org> == Merge branch commits == commit 29c1771d346dff901b0b778f764e6b4409900234 Author: Henry Saputra <hsaputra@apache.org> Date: Sat Feb 1 22:05:16 2014 -0800 Change the ⇒ character (maybe from scalariform) to => in Scala code for style consistency.
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala5
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala8
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala16
5 files changed, 15 insertions, 18 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index a5888811cc..bc0d1633f1 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -88,7 +88,7 @@ extends Actor with Receiver {
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
- case msg ⇒ pushBlock(msg.asInstanceOf[T])
+ case msg => pushBlock(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5cc721d7f9..3316b6dc39 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -17,14 +17,11 @@
package org.apache.spark.streaming.twitter
-import java.util.prefs.Preferences
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
-import twitter4j.conf.PropertyConfiguration
import twitter4j.auth.OAuthAuthorization
-import twitter4j.auth.AccessToken
-import org.apache.spark._
+
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 769761e3b8..960c6a389e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -31,7 +31,7 @@ import org.apache.spark.streaming.receivers._
*/
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
@@ -39,16 +39,16 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
def receive: Receive = {
- case Connecting ⇒ logInfo("connecting ...")
+ case Connecting => logInfo("connecting ...")
- case m: ZMQMessage ⇒
+ case m: ZMQMessage =>
logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
- case Closed ⇒ logInfo("received closed ")
+ case Closed => logInfo("received closed ")
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 7a14b3d2bf..b47d786986 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -46,7 +46,7 @@ object ZeroMQUtils {
ssc: StreamingContext,
publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 79ed696814..9c5b177c16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -37,8 +37,8 @@ object ReceiverSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
- case _: RuntimeException ⇒ Restart
- case _: Exception ⇒ Escalate
+ case _: RuntimeException => Restart
+ case _: Exception => Escalate
}
}
@@ -66,7 +66,7 @@ object ReceiverSupervisorStrategy {
*/
trait Receiver {
- self: Actor ⇒ // to ensure that this can be added to Actor classes only
+ self: Actor => // to ensure that this can be added to Actor classes only
/**
* Push an iterator received data into Spark Streaming for processing
@@ -139,25 +139,25 @@ private[streaming] class ActorReceiver[T: ClassTag](
def receive = {
- case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]])
+ case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]])
- case Data(msg) ⇒
+ case Data(msg) =>
blocksGenerator += msg.asInstanceOf[T]
n.incrementAndGet
- case props: Props ⇒
+ case props: Props =>
val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
- case (props: Props, name: String) ⇒
+ case (props: Props, name: String) =>
val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
case _: PossiblyHarmful => hiccups.incrementAndGet()
- case _: Statistics ⇒
+ case _: Statistics =>
val workers = context.children
sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))