aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-28 20:11:04 -0700
committerReynold Xin <rxin@databricks.com>2015-05-28 20:11:04 -0700
commit2881d14cbedc14f1cd8ae5078446dba1a8d39086 (patch)
treea036f826975f0ba8576f5804286519fe7c970789 /examples
parentff44c711abc7ca545dfa1e836279c00fe7539c18 (diff)
downloadspark-2881d14cbedc14f1cd8ae5078446dba1a8d39086.tar.gz
spark-2881d14cbedc14f1cd8ae5078446dba1a8d39086.tar.bz2
spark-2881d14cbedc14f1cd8ae5078446dba1a8d39086.zip
[SPARK-7929] Remove Bagel examples & whitespace fix for examples.
Author: Reynold Xin <rxin@databricks.com> Closes #6480 from rxin/whitespace-example and squashes the following commits: 8a4a3d4 [Reynold Xin] [SPARK-7929] Remove Bagel examples & whitespace fix for examples.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala112
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala106
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala232
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala29
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala3
15 files changed, 31 insertions, 485 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index 11d5c92c59..023bb3ee2d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -104,8 +104,8 @@ object CassandraCQLTest {
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
- classOf[java.util.Map[String,ByteBuffer]],
- classOf[java.util.Map[String,ByteBuffer]])
+ classOf[java.util.Map[String, ByteBuffer]],
+ classOf[java.util.Map[String, ByteBuffer]])
println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
@@ -118,7 +118,7 @@ object CassandraCQLTest {
case (productId, saleCount) => println(productId + ":" + saleCount)
}
- val casoutputCF = aggregatedRDD.map {
+ val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
index a55e0dc8d3..c3fc74a116 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
@@ -39,7 +39,7 @@ object LocalLR {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
- val y = if(i % 2 == 0) -1 else 1
+ val y = if (i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 6c0ac8013c..30c4261551 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -117,7 +117,7 @@ object SparkALS {
var us = Array.fill(U)(randomVector(F))
// Iteratively update movies then users
- val Rc = sc.broadcast(R)
+ val Rc = sc.broadcast(R)
var msb = sc.broadcast(ms)
var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index 8c01a60844..1e6b4fb0c7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -44,7 +44,7 @@ object SparkLR {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
- val y = if(i % 2 == 0) -1 else 1
+ val y = if (i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
deleted file mode 100644
index ab6e63deb3..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.bagel
-
-import org.apache.spark._
-import org.apache.spark.bagel._
-
-class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double)(
- self: PRVertex, messageSum: Option[Double], superstep: Int
- ): (PRVertex, Array[PRMessage]) = {
- val newValue = messageSum match {
- case Some(msgSum) if msgSum != 0 =>
- 0.15 / numVertices + 0.85 * msgSum
- case _ => self.value
- }
-
- val terminate = superstep >= 10
-
- val outbox: Array[PRMessage] =
- if (!terminate) {
- self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size))
- } else {
- Array[PRMessage]()
- }
-
- (new PRVertex(newValue, self.outEdges, !terminate), outbox)
- }
-
- def computeNoCombiner(numVertices: Long, epsilon: Double)
- (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int)
- : (PRVertex, Array[PRMessage]) =
- computeWithCombiner(numVertices, epsilon)(self, messages match {
- case Some(msgs) => Some(msgs.map(_.value).sum)
- case None => None
- }, superstep)
-}
-
-class PRCombiner extends Combiner[PRMessage, Double] with Serializable {
- def createCombiner(msg: PRMessage): Double =
- msg.value
- def mergeMsg(combiner: Double, msg: PRMessage): Double =
- combiner + msg.value
- def mergeCombiners(a: Double, b: Double): Double =
- a + b
-}
-
-class PRVertex() extends Vertex with Serializable {
- var value: Double = _
- var outEdges: Array[String] = _
- var active: Boolean = _
-
- def this(value: Double, outEdges: Array[String], active: Boolean = true) {
- this()
- this.value = value
- this.outEdges = outEdges
- this.active = active
- }
-
- override def toString(): String = {
- "PRVertex(value=%f, outEdges.length=%d, active=%s)"
- .format(value, outEdges.length, active.toString)
- }
-}
-
-class PRMessage() extends Message[String] with Serializable {
- var targetId: String = _
- var value: Double = _
-
- def this(targetId: String, value: Double) {
- this()
- this.targetId = targetId
- this.value = value
- }
-}
-
-class CustomPartitioner(partitions: Int) extends Partitioner {
- def numPartitions: Int = partitions
-
- def getPartition(key: Any): Int = {
- val hash = key match {
- case k: Long => (k & 0x00000000FFFFFFFFL).toInt
- case _ => key.hashCode
- }
-
- val mod = key.hashCode % partitions
- if (mod < 0) mod + partitions else mod
- }
-
- override def equals(other: Any): Boolean = other match {
- case c: CustomPartitioner =>
- c.numPartitions == numPartitions
- case _ => false
- }
-
- override def hashCode: Int = numPartitions
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
deleted file mode 100644
index 859abedf2a..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.bagel
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-
-import org.apache.spark.bagel._
-
-import scala.xml.{XML,NodeSeq}
-
-/**
- * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
- * files from there, which contains one line per wiki article in a tab-separated format
- * (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
- */
-object WikipediaPageRank {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println(
- "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <usePartitioner>")
- System.exit(-1)
- }
- val sparkConf = new SparkConf()
- sparkConf.setAppName("WikipediaPageRank")
- sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage]))
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numPartitions = args(2).toInt
- val usePartitioner = args(3).toBoolean
-
- sparkConf.setAppName("WikipediaPageRank")
- val sc = new SparkContext(sparkConf)
-
- // Parse the Wikipedia page data into a graph
- val input = sc.textFile(inputFile)
-
- println("Counting vertices...")
- val numVertices = input.count()
- println("Done counting vertices.")
-
- println("Parsing input file...")
- var vertices = input.map(line => {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val links =
- if (body == "\\N") {
- NodeSeq.Empty
- } else {
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
- NodeSeq.Empty
- }
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- val id = new String(title)
- (id, new PRVertex(1.0 / numVertices, outEdges))
- })
- if (usePartitioner) {
- vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache()
- } else {
- vertices = vertices.cache()
- }
- println("Done parsing input file.")
-
- // Do the computation
- val epsilon = 0.01 / numVertices
- val messages = sc.parallelize(Array[(String, PRMessage)]())
- val utils = new PageRankUtils
- val result =
- Bagel.run(
- sc, vertices, messages, combiner = new PRCombiner(),
- numPartitions = numPartitions)(
- utils.computeWithCombiner(numVertices, epsilon))
-
- // Print the result
- System.err.println("Articles with PageRank >= " + threshold + ":")
- val top =
- (result
- .filter { case (id, vertex) => vertex.value >= threshold }
- .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
- .collect().mkString)
- println(top)
-
- sc.stop()
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
deleted file mode 100644
index 576a3e371b..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.bagel
-
-import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-import scala.xml.{XML, NodeSeq}
-
-import org.apache.spark._
-import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import scala.reflect.ClassTag
-
-object WikipediaPageRankStandalone {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " +
- "<numIterations> <usePartitioner>")
- System.exit(-1)
- }
- val sparkConf = new SparkConf()
- sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer")
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numIterations = args(2).toInt
- val usePartitioner = args(3).toBoolean
-
- sparkConf.setAppName("WikipediaPageRankStandalone")
-
- val sc = new SparkContext(sparkConf)
-
- val input = sc.textFile(inputFile)
- val partitioner = new HashPartitioner(sc.defaultParallelism)
- val links =
- if (usePartitioner) {
- input.map(parseArticle _).partitionBy(partitioner).cache()
- } else {
- input.map(parseArticle _).cache()
- }
- val n = links.count()
- val defaultRank = 1.0 / n
- val a = 0.15
-
- // Do the computation
- val startTime = System.currentTimeMillis
- val ranks =
- pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner,
- sc.defaultParallelism)
-
- // Print the result
- System.err.println("Articles with PageRank >= " + threshold + ":")
- val top =
- (ranks
- .filter { case (id, rank) => rank >= threshold }
- .map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect().mkString)
- println(top)
-
- val time = (System.currentTimeMillis - startTime) / 1000.0
- println("Completed %d iterations in %f seconds: %f seconds per iteration"
- .format(numIterations, time, time / numIterations))
- sc.stop()
- }
-
- def parseArticle(line: String): (String, Array[String]) = {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val id = new String(title)
- val links =
- if (body == "\\N") {
- NodeSeq.Empty
- } else {
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
- NodeSeq.Empty
- }
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- (id, outEdges)
- }
-
- def pageRank(
- links: RDD[(String, Array[String])],
- numIterations: Int,
- defaultRank: Double,
- a: Double,
- n: Long,
- partitioner: Partitioner,
- usePartitioner: Boolean,
- numPartitions: Int
- ): RDD[(String, Double)] = {
- var ranks = links.mapValues { edges => defaultRank }
- for (i <- 1 to numIterations) {
- val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapperIterable, rankWrapperIterable)) =>
- val linksWrapper = linksWrapperIterable.iterator
- val rankWrapper = rankWrapperIterable.iterator
- if (linksWrapper.hasNext) {
- val linksWrapperHead = linksWrapper.next
- if (rankWrapper.hasNext) {
- val rankWrapperHead = rankWrapper.next
- linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
- } else {
- linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
- }
- } else {
- Array[(String, Double)]()
- }
- }
- ranks = (contribs.combineByKey((x: Double) => x,
- (x: Double, y: Double) => x + y,
- (x: Double, y: Double) => x + y,
- partitioner)
- .mapValues(sum => a/n + (1-a)*sum))
- }
- ranks
- }
-}
-
-class WPRSerializer extends org.apache.spark.serializer.Serializer {
- def newInstance(): SerializerInstance = new WPRSerializerInstance()
-}
-
-class WPRSerializerInstance extends SerializerInstance {
- def serialize[T: ClassTag](t: T): ByteBuffer = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
- throw new UnsupportedOperationException()
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new WPRSerializationStream(s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new WPRDeserializationStream(s)
- }
-}
-
-class WPRSerializationStream(os: OutputStream) extends SerializationStream {
- val dos = new DataOutputStream(os)
-
- def writeObject[T: ClassTag](t: T): SerializationStream = t match {
- case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
- case links: Array[String] => {
- dos.writeInt(0) // links
- dos.writeUTF(id)
- dos.writeInt(links.length)
- for (link <- links) {
- dos.writeUTF(link)
- }
- this
- }
- case rank: Double => {
- dos.writeInt(1) // rank
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
- case (id: String, rank: Double) => {
- dos.writeInt(2) // rank without wrapper
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
-
- def flush() { dos.flush() }
- def close() { dos.close() }
-}
-
-class WPRDeserializationStream(is: InputStream) extends DeserializationStream {
- val dis = new DataInputStream(is)
-
- def readObject[T: ClassTag](): T = {
- val typeId = dis.readInt()
- typeId match {
- case 0 => {
- val id = dis.readUTF()
- val numLinks = dis.readInt()
- val links = new Array[String](numLinks)
- for (i <- 0 until numLinks) {
- val link = dis.readUTF()
- links(i) = link
- }
- (id, ArrayBuffer(links)).asInstanceOf[T]
- }
- case 1 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, ArrayBuffer(rank)).asInstanceOf[T]
- }
- case 2 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, rank).asInstanceOf[T]
- }
- }
- }
-
- def close() { dis.close() }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
index b99d0a1246..6927eb8f27 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
@@ -73,7 +73,7 @@ object OneVsRestExample {
.action((x, c) => c.copy(fracTest = x))
opt[String]("testInput")
.text("input path to test dataset. If given, option fracTest is ignored")
- .action((x,c) => c.copy(testInput = Some(x)))
+ .action((x, c) => c.copy(testInput = Some(x)))
opt[Int]("maxIter")
.text(s"maximum number of iterations for Logistic Regression." +
s" default: ${defaultParams.maxIter}")
@@ -88,10 +88,10 @@ object OneVsRestExample {
.action((x, c) => c.copy(fitIntercept = x))
opt[Double]("regParam")
.text(s"the regularization parameter for Logistic Regression.")
- .action((x,c) => c.copy(regParam = Some(x)))
+ .action((x, c) => c.copy(regParam = Some(x)))
opt[Double]("elasticNetParam")
.text(s"the ElasticNet mixing parameter for Logistic Regression.")
- .action((x,c) => c.copy(elasticNetParam = Some(x)))
+ .action((x, c) => c.copy(elasticNetParam = Some(x)))
checkConfig { params =>
if (params.fracTest < 0 || params.fracTest >= 1) {
failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1).")
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
index df76b45e50..9a1aab036a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala
@@ -40,7 +40,7 @@ object DenseGaussianMixture {
private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) {
val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
- val ctx = new SparkContext(conf)
+ val ctx = new SparkContext(conf)
val data = ctx.textFile(inputFile).map { line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
index a11890d6f2..3ebb112fc0 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -36,22 +36,21 @@ object AvroConversionUtil extends Serializable {
return null
}
schema.getType match {
- case UNION => unpackUnion(obj, schema)
- case ARRAY => unpackArray(obj, schema)
- case FIXED => unpackFixed(obj, schema)
- case MAP => unpackMap(obj, schema)
- case BYTES => unpackBytes(obj)
- case RECORD => unpackRecord(obj)
- case STRING => obj.toString
- case ENUM => obj.toString
- case NULL => obj
+ case UNION => unpackUnion(obj, schema)
+ case ARRAY => unpackArray(obj, schema)
+ case FIXED => unpackFixed(obj, schema)
+ case MAP => unpackMap(obj, schema)
+ case BYTES => unpackBytes(obj)
+ case RECORD => unpackRecord(obj)
+ case STRING => obj.toString
+ case ENUM => obj.toString
+ case NULL => obj
case BOOLEAN => obj
- case DOUBLE => obj
- case FLOAT => obj
- case INT => obj
- case LONG => obj
- case other => throw new SparkException(
- s"Unknown Avro schema type ${other.getName}")
+ case DOUBLE => obj
+ case FLOAT => obj
+ case INT => obj
+ case LONG => obj
+ case other => throw new SparkException(s"Unknown Avro schema type ${other.getName}")
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index 92867b44be..016de4c63d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -104,10 +104,8 @@ extends Actor with ActorHelper {
object FeederActor {
def main(args: Array[String]) {
- if(args.length < 2){
- System.err.println(
- "Usage: FeederActor <hostname> <port>\n"
- )
+ if (args.length < 2){
+ System.err.println("Usage: FeederActor <hostname> <port>\n")
System.exit(1)
}
val Seq(host, port) = args.toSeq
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
index 11a8cf0953..fbe394de4a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -51,7 +51,7 @@ object DirectKafkaWordCount {
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 9ae1b045c2..60416ee343 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -49,10 +49,10 @@ object KafkaWordCount {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
- val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 85b9a54b40..b336751d81 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -49,7 +49,7 @@ object MQTTPublisher {
client.connect()
- val msgtopic = client.getTopic(topic)
+ val msgtopic = client.getTopic(topic)
val msgContent = "hello mqtt demo for spark streaming"
val message = new MqttMessage(msgContent.getBytes("utf-8"))
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 54d996b8ac..889f052c70 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -57,8 +57,7 @@ object PageViewGenerator {
404 -> .05)
val userZipCode = Map(94709 -> .5,
94117 -> .5)
- val userID = Map((1 to 100).map(_ -> .01):_*)
-
+ val userID = Map((1 to 100).map(_ -> .01) : _*)
def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
val rand = new Random().nextDouble()