aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-02-09 22:17:52 -0800
committerReynold Xin <rxin@apache.org>2014-02-09 22:17:52 -0800
commit919bd7f669c61500eee7231298d9880b320eb6f3 (patch)
tree5cdcf197aef425b6be47b676f8d7fe3d1e2e8c34 /examples
parent2182aa3c55737a90e0ff200eede7146b440801a3 (diff)
downloadspark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.gz
spark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.bz2
spark-919bd7f669c61500eee7231298d9880b320eb6f3.zip
Merge pull request #567 from ScrapCodes/style2.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2 Continuation of PR #557 With this all scala style errors are fixed across the code base !! The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too. Author: Prashant Sharma <prashant.s@imaginea.com> Closes #567 and squashes the following commits: 3b1ec30 [Prashant Sharma] scala style fixes
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala92
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala15
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala17
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala22
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala21
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala20
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala5
20 files changed, 144 insertions, 108 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 0097dade19..4d2f45df85 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -22,19 +22,21 @@ import org.apache.spark.SparkContext
object BroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]")
+ System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo]" +
+ " [blockSize]")
System.exit(1)
- }
-
+ }
+
val bcName = if (args.length > 3) args(3) else "Http"
val blockSize = if (args.length > 4) args(4) else "4096"
- System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
+ System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
+ "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
@@ -42,7 +44,7 @@ object BroadcastTest {
for (i <- 0 until arr1.length) {
arr1(i) = i
}
-
+
for (i <- 0 until 3) {
println("Iteration " + i)
println("===========")
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 33bf7151a7..3e3a3b2d50 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -146,68 +146,68 @@ assume Words keys as utf8;
set Words['3musk001']['book'] = 'The Three Musketeers';
set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
- be in as perfect a state of revolution as if the Huguenots had just made
- a second La Rochelle of it. Many citizens, seeing the women flying
- toward the High Street, leaving their children crying at the open doors,
- hastened to don the cuirass, and supporting their somewhat uncertain
- courage with a musket or a partisan, directed their steps toward the
- hostelry of the Jolly Miller, before which was gathered, increasing
- every minute, a compact group, vociferous and full of curiosity.';
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
set Words['3musk002']['book'] = 'The Three Musketeers';
set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
some city or other registering in its archives an event of this kind. There were
- nobles, who made war against each other; there was the king, who made
- war against the cardinal; there was Spain, which made war against the
- king. Then, in addition to these concealed or public, secret or open
- wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
- who made war upon everybody. The citizens always took up arms readily
- against thieves, wolves or scoundrels, often against nobles or
- Huguenots, sometimes against the king, but never against cardinal or
- Spain. It resulted, then, from this habit that on the said first Monday
- of April, 1625, the citizens, on hearing the clamor, and seeing neither
- the red-and-yellow standard nor the livery of the Duc de Richelieu,
- rushed toward the hostel of the Jolly Miller. When arrived there, the
- cause of the hubbub was apparent to all';
+ nobles, who made war against each other; there was the king, who made
+ war against the cardinal; there was Spain, which made war against the
+ king. Then, in addition to these concealed or public, secret or open
+ wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+ who made war upon everybody. The citizens always took up arms readily
+ against thieves, wolves or scoundrels, often against nobles or
+ Huguenots, sometimes against the king, but never against cardinal or
+ Spain. It resulted, then, from this habit that on the said first Monday
+ of April, 1625, the citizens, on hearing the clamor, and seeing neither
+ the red-and-yellow standard nor the livery of the Duc de Richelieu,
+ rushed toward the hostel of the Jolly Miller. When arrived there, the
+ cause of the hubbub was apparent to all';
set Words['3musk003']['book'] = 'The Three Musketeers';
set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
large the sum may be; but you ought also to endeavor to perfect yourself in
- the exercises becoming a gentleman. I will write a letter today to the
- Director of the Royal Academy, and tomorrow he will admit you without
- any expense to yourself. Do not refuse this little service. Our
- best-born and richest gentlemen sometimes solicit it without being able
- to obtain it. You will learn horsemanship, swordsmanship in all its
- branches, and dancing. You will make some desirable acquaintances; and
- from time to time you can call upon me, just to tell me how you are
- getting on, and to say whether I can be of further service to you.';
+ the exercises becoming a gentleman. I will write a letter today to the
+ Director of the Royal Academy, and tomorrow he will admit you without
+ any expense to yourself. Do not refuse this little service. Our
+ best-born and richest gentlemen sometimes solicit it without being able
+ to obtain it. You will learn horsemanship, swordsmanship in all its
+ branches, and dancing. You will make some desirable acquaintances; and
+ from time to time you can call upon me, just to tell me how you are
+ getting on, and to say whether I can be of further service to you.';
set Words['thelostworld001']['book'] = 'The Lost World';
set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
against the red curtain. How beautiful she was! And yet how aloof! We had been
- friends, quite good friends; but never could I get beyond the same
- comradeship which I might have established with one of my
- fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
- and perfectly unsexual. My instincts are all against a woman being too
- frank and at her ease with me. It is no compliment to a man. Where
- the real sex feeling begins, timidity and distrust are its companions,
- heritage from old wicked days when love and violence went often hand in
- hand. The bent head, the averted eye, the faltering voice, the wincing
- figure--these, and not the unshrinking gaze and frank reply, are the
- true signals of passion. Even in my short life I had learned as much
- as that--or had inherited it in that race memory which we call instinct.';
+ friends, quite good friends; but never could I get beyond the same
+ comradeship which I might have established with one of my
+ fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+ and perfectly unsexual. My instincts are all against a woman being too
+ frank and at her ease with me. It is no compliment to a man. Where
+ the real sex feeling begins, timidity and distrust are its companions,
+ heritage from old wicked days when love and violence went often hand in
+ hand. The bent head, the averted eye, the faltering voice, the wincing
+ figure--these, and not the unshrinking gaze and frank reply, are the
+ true signals of passion. Even in my short life I had learned as much
+ as that--or had inherited it in that race memory which we call instinct.';
set Words['thelostworld002']['book'] = 'The Lost World';
set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
- the real boss; but he lived in the rarefied atmosphere of some Olympian
- height from which he could distinguish nothing smaller than an
- international crisis or a split in the Cabinet. Sometimes we saw him
- passing in lonely majesty to his inner sanctum, with his eyes staring
- vaguely and his mind hovering over the Balkans or the Persian Gulf. He
- was above and beyond us. But McArdle was his first lieutenant, and it
- was he that we knew. The old man nodded as I entered the room, and he
- pushed his spectacles far up on his bald forehead.';
+ the real boss; but he lived in the rarefied atmosphere of some Olympian
+ height from which he could distinguish nothing smaller than an
+ international crisis or a split in the Cabinet. Sometimes we saw him
+ passing in lonely majesty to his inner sanctum, with his eyes staring
+ vaguely and his mind hovering over the Balkans or the Persian Gulf. He
+ was above and beyond us. But McArdle was his first lieutenant, and it
+ was he that we knew. The old man nodded as I entered the room, and he
+ pushed his spectacles far up on his bald forehead.';
*/
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
index b3eb611dd2..fdb976dfc6 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -29,8 +29,9 @@ object ExceptionHandlingTest {
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
- if (math.random > 0.75)
+ if (math.random > 0.75) {
throw new Exception("Testing exception handling")
+ }
}
System.exit(0)
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 39752fdd0e..36534e5935 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
object GroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.err.println(
+ "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
@@ -35,7 +36,7 @@ object GroupByTest {
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
@@ -48,7 +49,7 @@ object GroupByTest {
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
-
+
println(pairs1.groupByKey(numReducers).count)
System.exit(0)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index 9ab5f5a486..737c444139 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
- DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index a730464ea1..3895675b3b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -80,7 +80,11 @@ object LocalKMeans {
var mappings = closest.groupBy[Int] (x => x._1)
- var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+ var pointStats = mappings.map { pair =>
+ pair._2.reduceLeft [(Int, (Vector, Int))] {
+ case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
+ }
+ }
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 544c782469..fcaba6bb4f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -50,10 +50,10 @@ object LogQuery {
val dataSet =
if (args.length == 2) sc.textFile(args(1))
else sc.parallelize(exampleApacheLogs)
-
+ // scalastyle:off
val apacheLogRegex =
"""^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
-
+ // scalastyle:on
/** Tracks the total query count and number of aggregate bytes for a particular group. */
class Stats(val count: Int, val numBytes: Int) extends Serializable {
def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index 31c6d108f3..966478fe4a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
object SkewedGroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.err.println(
+ "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 39819064ed..cf1fc3e808 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -56,7 +56,8 @@ object SparkHdfsLR {
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
- Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+ ))
val lines = sc.textFile(inputPath)
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index cfafbaf23e..b97cb8fb02 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -43,16 +43,18 @@ class PageRankUtils extends Serializable {
val terminate = superstep >= 10
val outbox: Array[PRMessage] =
- if (!terminate)
- self.outEdges.map(targetId =>
- new PRMessage(targetId, newValue / self.outEdges.size))
- else
+ if (!terminate) {
+ self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size))
+ } else {
Array[PRMessage]()
+ }
(new PRVertex(newValue, self.outEdges, !terminate), outbox)
}
- def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
+ def computeNoCombiner(numVertices: Long, epsilon: Double)
+ (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int)
+ : (PRVertex, Array[PRMessage]) =
computeWithCombiner(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
case None => None
@@ -81,7 +83,8 @@ class PRVertex() extends Vertex with Serializable {
}
override def toString(): String = {
- "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
+ "PRVertex(value=%f, outEdges.length=%d, active=%s)"
+ .format(value, outEdges.length, active.toString)
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 4c0de46964..25bd55ca88 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -33,7 +33,8 @@ import scala.xml.{XML,NodeSeq}
object WikipediaPageRank {
def main(args: Array[String]) {
if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
+ System.err.println(
+ "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
@@ -61,24 +62,26 @@ object WikipediaPageRank {
val fields = line.split("\t")
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val links =
- if (body == "\\N")
+ if (body == "\\N") {
NodeSeq.Empty
- else
+ } else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
+ }
val outEdges = links.map(link => new String(link.text)).toArray
val id = new String(title)
(id, new PRVertex(1.0 / numVertices, outEdges))
})
- if (usePartitioner)
+ if (usePartitioner) {
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
- else
+ } else {
vertices = vertices.cache
+ }
println("Done parsing input file.")
// Do the computation
@@ -92,7 +95,7 @@ object WikipediaPageRank {
utils.computeWithCombiner(numVertices, epsilon))
// Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
+ System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(result
.filter { case (id, vertex) => vertex.value >= threshold }
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 2cf273a702..27afa6b642 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -31,7 +31,8 @@ import org.apache.spark.rdd.RDD
object WikipediaPageRankStandalone {
def main(args: Array[String]) {
if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
+ System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " +
+ "<numIterations> <host> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
@@ -51,10 +52,11 @@ object WikipediaPageRankStandalone {
val input = sc.textFile(inputFile)
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
- if (usePartitioner)
+ if (usePartitioner) {
input.map(parseArticle _).partitionBy(partitioner).cache()
- else
+ } else {
input.map(parseArticle _).cache()
+ }
val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
@@ -62,10 +64,11 @@ object WikipediaPageRankStandalone {
// Do the computation
val startTime = System.currentTimeMillis
val ranks =
- pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
+ pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner,
+ sc.defaultParallelism)
// Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
+ System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(ranks
.filter { case (id, rank) => rank >= threshold }
@@ -75,7 +78,7 @@ object WikipediaPageRankStandalone {
val time = (System.currentTimeMillis - startTime) / 1000.0
println("Completed %d iterations in %f seconds: %f seconds per iteration"
- .format(numIterations, time, time / numIterations))
+ .format(numIterations, time, time / numIterations))
System.exit(0)
}
@@ -84,16 +87,17 @@ object WikipediaPageRankStandalone {
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val id = new String(title)
val links =
- if (body == "\\N")
+ if (body == "\\N") {
NodeSeq.Empty
- else
+ } else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
+ }
val outEdges = links.map(link => new String(link.text)).toArray
(id, outEdges)
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index bc0d1633f1..3d7b390724 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -132,7 +132,7 @@ object FeederActor {
* To run this example locally, you may run Feeder Actor as
* `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
- * `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ * `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index d9cb7326bb..6bccd1d884 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.util.RawTextHelper._
+// scalastyle:off
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
@@ -38,6 +39,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
* Example:
* `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
+// scalastyle:on
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 5) {
@@ -56,7 +58,7 @@ object KafkaWordCount {
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l))
+ val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index eb61caf8c8..0a68ac84c2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -64,6 +64,7 @@ object MQTTPublisher {
}
}
+// scalastyle:off
/**
* A sample wordcount with MqttStream stream
*
@@ -71,7 +72,8 @@ object MQTTPublisher {
* Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
* In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
* Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
+ * Example Java code for Mqtt Publisher and Subscriber can be found here
+ * https://bitbucket.org/mkjinesh/mqttclient
* Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
@@ -81,6 +83,7 @@ object MQTTPublisher {
* and run the example as
* `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
+// scalastyle:on
object MQTTWordCount {
def main(args: Array[String]) {
@@ -93,7 +96,7 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
- val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 5656d487a5..d4c4d86b34 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
+// scalastyle:off
/**
* Counts words in text encoded with UTF8 received from the network every second.
*
@@ -33,6 +34,7 @@ import org.apache.spark.storage.StorageLevel
* and then run the example
* `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
+// scalastyle:on
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index aa82bf3c6b..56d10a964b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -30,8 +30,8 @@ import java.nio.charset.Charset
*
* Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- * <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
* <output-file> file to which the word counts will be appended
*
* In local mode, <master> should be 'local[n]' with n > 1
@@ -54,11 +54,13 @@ import java.nio.charset.Charset
*
* To run this example in a local standalone cluster with automatic driver recovery,
*
- * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
+ * <path-to-examples-jar> \
* org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
* localhost 9999 ~/checkpoint ~/out`
*
- * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ * <path-to-examples-jar> would typically be
+ * <spark-dir>/examples/target/scala-XX/spark-examples....jar
*
* Refer to the online documentation for more details.
*/
@@ -96,11 +98,12 @@ object RecoverableNetworkWordCount {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
- |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
- | <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- | <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- | <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
- | <output-file> file to which the word counts will be appended
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory>
+ | <output-file> <master> is the Spark master URL. In local mode, <master> should be
+ | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark
+ | Streaming would connect to receive data. <checkpoint-directory> directory to
+ | HDFS-compatible file system which checkpoint data <output-file> file to which the
+ | word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index bbd44948b6..8a654f8fad 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -34,15 +34,19 @@ import org.apache.spark.streaming.twitter._
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
- * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
- * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
- * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
- * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
- * percentage of the overall total count.
+ * <a href=
+ * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
+ * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
+ * of any given element, etc), that uses space sub-linear in the number of elements in the
+ * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
+ * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
+ * count.
* <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
+ * reduce operation.
*/
+// scalastyle:on
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
if (args.length < 1) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c6215fd0d7..45771d7050 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -29,8 +29,7 @@ import org.apache.spark.streaming.twitter._
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
- * This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
- * -mining/">
+ * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* blog post</a> and this
* <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
* blog post</a>
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 85b4ce5e81..35be7ffa1e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -53,11 +53,13 @@ object SimpleZeroMQPublisher {
}
}
+// scalastyle:off
/**
* A sample wordcount with ZeroMQStream stream
*
* To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
+ * (http://www.zeromq.org/intro:get-the-software)
*
* Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
@@ -68,6 +70,7 @@ object SimpleZeroMQPublisher {
* and run the example as
* `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
+// scalastyle:on
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 3) {