authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-31 19:27:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-09-01 14:13:13 -0700
commit46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef (patch)
tree4a46971b36680bc5ef51be81ada8eb47670f6b22 /examples/src/main/scala/org/apache
parenta30fac16ca0525f2001b127e5f9518c9680844c9 (diff)
Initial work to rename package to org.apache.spark
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+object BroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length) {
+ arr1(i) = i
+ }
+ for (i <- 0 until 2) {
+ println("Iteration " + i)
+ println("===========")
+ val barr1 = sc.broadcast(arr1)
+ sc.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size)
+ }
+ }
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.hadoop.mapreduce.Job
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
+import org.apache.cassandra.thrift._
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.nio.ByteBuffer
+import java.util.SortedMap
+import org.apache.cassandra.db.IColumn
+import org.apache.cassandra.utils.ByteBufferUtil
+import scala.collection.JavaConversions._
+ * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
+ * support for Hadoop.
+ *
+ * To run this example, run this file with the following command params -
+ * <spark_master> <cassandra_node> <cassandra_port>
+ *
+ * So if you want to run this on localhost this will be,
+ * local[3] localhost 9160
+ *
+ * The example makes some assumptions:
+ * 1. You have already created a keyspace called casDemo and it has a column family named Words
+ * 2. There are column family has a column named "para" which has test content.
+ *
+ * You can create the content by running the following script at the bottom of this file with
+ * cassandra-cli.
+ *
+ */
+object CassandraTest {
+ def main(args: Array[String]) {
+ // Get a SparkContext
+ val sc = new SparkContext(args(0), "casDemo")
+ // Build the job configuration with ConfigHelper provided by Cassandra
+ val job = new Job()
+ job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
+ val host: String = args(1)
+ val port: String = args(2)
+ ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
+ val predicate = new SlicePredicate()
+ val sliceRange = new SliceRange()
+ sliceRange.setStart(Array.empty[Byte])
+ sliceRange.setFinish(Array.empty[Byte])
+ predicate.setSlice_range(sliceRange)
+ ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate)
+ ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ // Make a new Hadoop RDD
+ val casRdd = sc.newAPIHadoopRDD(
+ job.getConfiguration(),
+ classOf[ColumnFamilyInputFormat],
+ classOf[ByteBuffer],
+ classOf[SortedMap[ByteBuffer, IColumn]])
+ // Let us first get all the paragraphs from the retrieved rows
+ val paraRdd = casRdd.map {
+ case (key, value) => {
+ ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
+ }
+ }
+ // Lets get the word count in paras
+ val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
+ counts.collect().foreach {
+ case (word, count) => println(word + ":" + count)
+ }
+ counts.map {
+ case (word, count) => {
+ val colWord = new org.apache.cassandra.thrift.Column()
+ colWord.setName(ByteBufferUtil.bytes("word"))
+ colWord.setValue(ByteBufferUtil.bytes(word))
+ colWord.setTimestamp(System.currentTimeMillis)
+ val colCount = new org.apache.cassandra.thrift.Column()
+ colCount.setName(ByteBufferUtil.bytes("wcount"))
+ colCount.setValue(ByteBufferUtil.bytes(count.toLong))
+ colCount.setTimestamp(System.currentTimeMillis)
+ val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
+ val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
+ mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(0).column_or_supercolumn.setColumn(colWord)
+ mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(1).column_or_supercolumn.setColumn(colCount)
+ (outputkey, mutations)
+ }
+ }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
+ classOf[ColumnFamilyOutputFormat], job.getConfiguration)
+ }
+create keyspace casDemo;
+use casDemo;
+create column family WordCount with comparator = UTF8Type;
+update column family WordCount with column_metadata =
+ [{column_name: word, validation_class: UTF8Type},
+ {column_name: wcount, validation_class: LongType}];
+create column family Words with comparator = UTF8Type;
+update column family Words with column_metadata =
+ [{column_name: book, validation_class: UTF8Type},
+ {column_name: para, validation_class: UTF8Type}];
+assume Words keys as utf8;
+set Words['3musk001']['book'] = 'The Three Musketeers';
+set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
+ town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
+set Words['3musk002']['book'] = 'The Three Musketeers';
+set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
+ some city or other registering in its archives an event of this kind. There were
+ nobles, who made war against each other; there was the king, who made
+ war against the cardinal; there was Spain, which made war against the
+ king. Then, in addition to these concealed or public, secret or open
+ wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+ who made war upon everybody. The citizens always took up arms readily
+ against thieves, wolves or scoundrels, often against nobles or
+ Huguenots, sometimes against the king, but never against cardinal or
+ Spain. It resulted, then, from this habit that on the said first Monday
+ of April, 1625, the citizens, on hearing the clamor, and seeing neither
+ the red-and-yellow standard nor the livery of the Duc de Richelieu,
+ rushed toward the hostel of the Jolly Miller. When arrived there, the
+ cause of the hubbub was apparent to all';
+set Words['3musk003']['book'] = 'The Three Musketeers';
+set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
+ large the sum may be; but you ought also to endeavor to perfect yourself in
+ the exercises becoming a gentleman. I will write a letter today to the
+ Director of the Royal Academy, and tomorrow he will admit you without
+ any expense to yourself. Do not refuse this little service. Our
+ best-born and richest gentlemen sometimes solicit it without being able
+ to obtain it. You will learn horsemanship, swordsmanship in all its
+ branches, and dancing. You will make some desirable acquaintances; and
+ from time to time you can call upon me, just to tell me how you are
+ getting on, and to say whether I can be of further service to you.';
+set Words['thelostworld001']['book'] = 'The Lost World';
+set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
+ against the red curtain. How beautiful she was! And yet how aloof! We had been
+ friends, quite good friends; but never could I get beyond the same
+ comradeship which I might have established with one of my
+ fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+ and perfectly unsexual. My instincts are all against a woman being too
+ frank and at her ease with me. It is no compliment to a man. Where
+ the real sex feeling begins, timidity and distrust are its companions,
+ heritage from old wicked days when love and violence went often hand in
+ hand. The bent head, the averted eye, the faltering voice, the wincing
+ figure--these, and not the unshrinking gaze and frank reply, are the
+ true signals of passion. Even in my short life I had learned as much
+ as that--or had inherited it in that race memory which we call instinct.';
+set Words['thelostworld002']['book'] = 'The Lost World';
+set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
+ red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
+ the real boss; but he lived in the rarefied atmosphere of some Olympian
+ height from which he could distinguish nothing smaller than an
+ international crisis or a split in the Cabinet. Sometimes we saw him
+ passing in lonely majesty to his inner sanctum, with his eyes staring
+ vaguely and his mind hovering over the Balkans or the Persian Gulf. He
+ was above and beyond us. But McArdle was his first lieutenant, and it
+ was he that we knew. The old man nodded as I entered the room, and he
+ pushed his spectacles far up on his bald forehead.';
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+object ExceptionHandlingTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: ExceptionHandlingTest <master>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
+ if (math.random > 0.75)
+ throw new Exception("Testing exception handling")
+ }
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.util.Random
+object GroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+ println(pairs1.groupByKey(numReducers).count)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark._
+import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+object HBaseTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HBaseTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val conf = HBaseConfiguration.create()
+ // Other options for configuring scan behavior are available. More information available at
+ // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
+ conf.set(TableInputFormat.INPUT_TABLE, args(1))
+ // Initialize hBase table if necessary
+ val admin = new HBaseAdmin(conf)
+ if(!admin.isTableAvailable(args(1))) {
+ val tableDesc = new HTableDescriptor(args(1))
+ admin.createTable(tableDesc)
+ }
+ val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
+ classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
+ classOf[org.apache.hadoop.hbase.client.Result])
+ hBaseRDD.count()
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark._
+object HdfsTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HdfsTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val file = sc.textFile(args(1))
+ val mapped = file.map(s => s.length).cache()
+ for (iter <- 1 to 10) {
+ val start = System.currentTimeMillis()
+ for (x <- mapped) { x + 2 }
+ // println("Processing: " + x)
+ val end = System.currentTimeMillis()
+ println("Iteration " + iter + " took " + (end-start) + " ms")
+ }
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+ * Alternating least squares matrix factorization.
+ */
+object LocalALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+ val LAMBDA = 0.01 // Regularization coefficient
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+ def main(args: Array[String]) {
+ args match {
+ case Array(m, u, f, iters) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ }
+ case _ => {
+ System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ val R = generateR()
+ // Initialize m and u randomly
+ var ms = Array.fill(M)(factory1D.random(F))
+ var us = Array.fill(U)(factory1D.random(F))
+ // Iteratively update movies then users
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
+ us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import org.apache.spark.util.Vector
+object LocalFileLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+ case class DataPoint(x: Vector, y: Double)
+ def parsePoint(line: String): DataPoint = {
+ val nums = line.split(' ').map(_.toDouble)
+ return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ }
+ def main(args: Array[String]) {
+ val lines = scala.io.Source.fromFile(args(0)).getLines().toArray
+ val points = lines.map(parsePoint _)
+ val ITERATIONS = args(1).toInt
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- points) {
+ val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+ println("Final w: " + w)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import org.apache.spark.util.Vector
+import org.apache.spark.SparkContext._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+ * K-means clustering.
+ */
+object LocalKMeans {
+ val N = 1000
+ val R = 1000 // Scaling factor
+ val D = 10
+ val K = 10
+ val convergeDist = 0.001
+ val rand = new Random(42)
+ def generateData = {
+ def generatePoint(i: Int) = {
+ Vector(D, _ => rand.nextDouble * R)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+ def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+ for (i <- 1 to centers.size) {
+ val vCurr = centers.get(i).get
+ val tempDist = p.squaredDist(vCurr)
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+ return bestIndex
+ }
+ def main(args: Array[String]) {
+ val data = generateData
+ var points = new HashSet[Vector]
+ var kPoints = new HashMap[Int, Vector]
+ var tempDist = 1.0
+ while (points.size < K) {
+ points.add(data(rand.nextInt(N)))
+ }
+ val iter = points.iterator
+ for (i <- 1 to points.size) {
+ kPoints.put(i, iter.next())
+ }
+ println("Initial centers: " + kPoints)
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+ var mappings = closest.groupBy[Int] (x => x._1)
+ var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+ var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+ tempDist = 0.0
+ for (mapping <- newPoints) {
+ tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+ }
+ for (newP <- newPoints) {
+ kPoints.put(newP._1, newP._2)
+ }
+ }
+ println("Final centers: " + kPoints)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import org.apache.spark.util.Vector
+ * Logistic regression based classification.
+ */
+object LocalLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Number of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+ case class DataPoint(x: Vector, y: Double)
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+ def main(args: Array[String]) {
+ val data = generateData
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- data) {
+ val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+ println("Final w: " + w)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import scala.math.random
+import org.apache.spark._
+import SparkContext._
+object LocalPi {
+ def main(args: Array[String]) {
+ var count = 0
+ for (i <- 1 to 100000) {
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) count += 1
+ }
+ println("Pi is roughly " + 4 * count / 100000.0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+ * Executes a roll up-style query against Apache logs.
+ */
+object LogQuery {
+ val exampleApacheLogs = List(
+ """ - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
+ | images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""),
+ """ - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
+ | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
+ | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
+ | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
+ | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
+ | 0 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "")
+ )
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: LogQuery <master> [logFile]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Log Query",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val dataSet =
+ if (args.length == 2) sc.textFile(args(1))
+ else sc.parallelize(exampleApacheLogs)
+ val apacheLogRegex =
+ """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
+ /** Tracks the total query count and number of aggregate bytes for a particular group. */
+ class Stats(val count: Int, val numBytes: Int) extends Serializable {
+ def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
+ override def toString = "bytes=%s\tn=%s".format(numBytes, count)
+ }
+ def extractKey(line: String): (String, String, String) = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ if (user != "\"-\"") (ip, user, query)
+ else (null, null, null)
+ case _ => (null, null, null)
+ }
+ }
+ def extractStats(line: String): Stats = {
+ apacheLogRegex.findFirstIn(line) match {
+ case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
+ new Stats(1, bytes.toInt)
+ case _ => new Stats(1, 0)
+ }
+ }
+ dataSet.map(line => (extractKey(line), extractStats(line)))
+ .reduceByKey((a, b) => a.merge(b))
+ .collect().foreach{
+ case (user, query) => println("%s\t%s".format(user, query))}
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+object MultiBroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length) {
+ arr1(i) = i
+ }
+ var arr2 = new Array[Int](num)
+ for (i <- 0 until arr2.length) {
+ arr2(i) = i
+ }
+ val barr1 = sc.broadcast(arr1)
+ val barr2 = sc.broadcast(arr2)
+ sc.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size + barr2.value.size)
+ }
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.util.Random
+object SimpleSkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SimpleSkewedGroupByTest <master> " +
+ "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
+ System.exit(1)
+ }
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ var ratio = if (args.length > 5) args(5).toInt else 5.0
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var result = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ val offset = ranGen.nextInt(1000) * numReducers
+ if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
+ // give ratio times higher chance of generating key 0 (for reducer 0)
+ result(i) = (offset, byteArr)
+ } else {
+ // generate a key for one of the other reducers
+ val key = 1 + ranGen.nextInt(numReducers-1) + offset
+ result(i) = (key, byteArr)
+ }
+ }
+ result
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+ println("RESULT: " + pairs1.groupByKey(numReducers).count)
+ // Print how many keys each reducer got (for debugging)
+ //println("RESULT: " + pairs1.groupByKey(numReducers)
+ // .map{case (k,v) => (k, v.size)}
+ // .collectAsMap)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.util.Random
+object SkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ // map output sizes lineraly increase from the 1st to the last
+ numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache()
+ // Enforce that everything has been calculated and in cache
+ pairs1.count()
+ println(pairs1.groupByKey(numReducers).count())
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+import org.apache.spark._
+ * Alternating least squares matrix factorization.
+ */
+object SparkALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+ val LAMBDA = 0.01 // Regularization coefficient
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+ def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val U = us.size
+ val F = us(0).size
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
+ System.exit(1)
+ }
+ var host = ""
+ var slices = 0
+ val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None)
+ options.toArray match {
+ case Array(host_, m, u, f, iters, slices_) =>
+ host = host_.get
+ M = m.getOrElse("100").toInt
+ U = u.getOrElse("500").toInt
+ F = f.getOrElse("10").toInt
+ ITERATIONS = iters.getOrElse("5").toInt
+ slices = slices_.getOrElse("2").toInt
+ case _ =>
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
+ System.exit(1)
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
+ val sc = new SparkContext(host, "SparkALS",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val R = generateR()
+ // Initialize m and u randomly
+ var ms = Array.fill(M)(factory1D.random(F))
+ var us = Array.fill(U)(factory1D.random(F))
+ // Iteratively update movies then users
+ val Rc = sc.broadcast(R)
+ var msb = sc.broadcast(ms)
+ var usb = sc.broadcast(us)
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = sc.parallelize(0 until M, slices)
+ .map(i => update(i, msb.value(i), usb.value, Rc.value))
+ .toArray
+ msb = sc.broadcast(ms) // Re-broadcast ms because it was updated
+ us = sc.parallelize(0 until U, slices)
+ .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value)))
+ .toArray
+ usb = sc.broadcast(us) // Re-broadcast us because it was updated
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import scala.math.exp
+import org.apache.spark.util.Vector
+import org.apache.spark._
+import org.apache.spark.scheduler.InputFormatInfo
+ * Logistic regression based classification.
+ */
+object SparkHdfsLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+ case class DataPoint(x: Vector, y: Double)
+ def parsePoint(line: String): DataPoint = {
+ //val nums = line.split(' ').map(_.toDouble)
+ //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ val tok = new java.util.StringTokenizer(line, " ")
+ var y = tok.nextToken.toDouble
+ var x = new Array[Double](D)
+ var i = 0
+ while (i < D) {
+ x(i) = tok.nextToken.toDouble; i += 1
+ }
+ return DataPoint(new Vector(x), y)
+ }
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
+ System.exit(1)
+ }
+ val inputPath = args(1)
+ val conf = SparkEnv.get.hadoop.newConfiguration()
+ val sc = new SparkContext(args(0), "SparkHdfsLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
+ InputFormatInfo.computePreferredLocations(
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+ val lines = sc.textFile(inputPath)
+ val points = lines.map(parsePoint _).cache()
+ val ITERATIONS = args(2).toInt
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = points.map { p =>
+ (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
+ }.reduce(_ + _)
+ w -= gradient
+ }
+ println("Final w: " + w)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Vector
+import org.apache.spark.SparkContext._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+ * K-means clustering.
+ */
+object SparkKMeans {
+ val R = 1000 // Scaling factor
+ val rand = new Random(42)
+ def parseVector(line: String): Vector = {
+ return new Vector(line.split(' ').map(_.toDouble))
+ }
+ def closestPoint(p: Vector, centers: Array[Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+ for (i <- 0 until centers.length) {
+ val tempDist = p.squaredDist(centers(i))
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+ return bestIndex
+ }
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkLocalKMeans",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val lines = sc.textFile(args(1))
+ val data = lines.map(parseVector _).cache()
+ val K = args(2).toInt
+ val convergeDist = args(3).toDouble
+ var kPoints = data.takeSample(false, K, 42).toArray
+ var tempDist = 1.0
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+ var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+ var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
+ tempDist = 0.0
+ for (i <- 0 until K) {
+ tempDist += kPoints(i).squaredDist(newPoints(i))
+ }
+ for (newP <- newPoints) {
+ kPoints(newP._1) = newP._2
+ }
+ println("Finished iteration (delta = " + tempDist + ")")
+ }
+ println("Final centers:")
+ kPoints.foreach(println)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import java.util.Random
+import scala.math.exp
+import org.apache.spark.util.Vector
+import org.apache.spark._
+ * Logistic regression based classification.
+ */
+object SparkLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Numer of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+ case class DataPoint(x: Vector, y: Double)
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkLR <master> [<slices>]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val numSlices = if (args.length > 1) args(1).toInt else 2
+ val points = sc.parallelize(generateData, numSlices).cache()
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = points.map { p =>
+ (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
+ }.reduce(_ + _)
+ w -= gradient
+ }
+ println("Final w: " + w)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+package org.apache.spark.examples
+import org.apache.spark.SparkContext._
+import org.apache.spark.SparkContext
+ * Computes the PageRank of URLs from an input file. Input file should
+ * be in format of:
+ * URL neighbor URL
+ * URL neighbor URL
+ * URL neighbor URL
+ * ...
+ * where URL and their neighbors are separated by space(s).
+ */
+object SparkPageRank {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
+ System.exit(1)
+ }
+ var iters = args(2).toInt
+ val ctx = new SparkContext(args(0), "PageRank",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val lines = ctx.textFile(args(1), 1)
+ val links = lines.map{ s =>
+ val parts = s.split("\\s+")
+ (parts(0), parts(1))
+ }.distinct().groupByKey().cache()
+ var ranks = links.mapValues(v => 1.0)
+ for (i <- 1 to iters) {
+ val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
+ val size = urls.size
+ urls.map(url => (url, rank / size))
+ }
+ ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
+ }
+ val output = ranks.collect()
+ output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import scala.math.random
+import org.apache.spark._
+import SparkContext._
+/** Computes an approximation to pi */
+object SparkPi {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkPi <master> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkPi",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val n = 100000 * slices
+ val count = spark.parallelize(1 to n, slices).map { i =>
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) 1 else 0
+ }.reduce(_ + _)
+ println("Pi is roughly " + 4.0 * count / n)
+ System.exit(0)
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples
+import org.apache.spark._
+import SparkContext._
+import scala.util.Random
+import scala.collection.mutable
+ * Transitive closure on a graph.
+ */
+object SparkTC {
+ val numEdges = 200
+ val numVertices = 100
+ val rand = new Random(42)
+ def generateGraph = {
+ val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
+ while (edges.size < numEdges) {
+ val from = rand.nextInt(numVertices)
+ val to = rand.nextInt(numVertices)
+ if (from != to) edges.+=((from, to))
+ }
+ edges.toSeq
+ }
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkTC <master> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkTC",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val slices = if (args.length > 1) args(1).toInt else 2
+ var tc = spark.parallelize(generateGraph, slices).cache()
+ // Linear transitive closure: each round grows paths by one edge,
+ // by joining the graph's edges with the already-discovered paths.
+ // e.g. join the path (y, z) from the TC with the edge (x, y) from
+ // the graph to obtain the path (x, z).
+ // Because join() joins on keys, the edges are stored in reversed order.
+ val edges = tc.map(x => (x._2, x._1))
+ // This join is iterated until a fixed point is reached.
+ var oldCount = 0L
+ var nextCount = tc.count()
+ do {
+ oldCount = nextCount
+ // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
+ // then project the result to obtain the new (x, z) paths.
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
+ nextCount = tc.count()
+ } while (nextCount != oldCount)
+ println("TC has " + tc.count() + " edges.")
+ System.exit(0)
+ }
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.bagel
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.bagel._
+import org.apache.spark.bagel.Bagel._
+import scala.collection.mutable.ArrayBuffer
+import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
+import com.esotericsoftware.kryo._
+class PageRankUtils extends Serializable {
+ def computeWithCombiner(numVertices: Long, epsilon: Double)(
+ self: PRVertex, messageSum: Option[Double], superstep: Int
+ ): (PRVertex, Array[PRMessage]) = {
+ val newValue = messageSum match {
+ case Some(msgSum) if msgSum != 0 =>
+ 0.15 / numVertices + 0.85 * msgSum
+ case _ => self.value
+ }
+ val terminate = superstep >= 10
+ val outbox: Array[PRMessage] =
+ if (!terminate)
+ self.outEdges.map(targetId =>
+ new PRMessage(targetId, newValue / self.outEdges.size))
+ else
+ Array[PRMessage]()
+ (new PRVertex(newValue, self.outEdges, !terminate), outbox)
+ }
+ def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
+ computeWithCombiner(numVertices, epsilon)(self, messages match {
+ case Some(msgs) => Some(msgs.map(_.value).sum)
+ case None => None
+ }, superstep)
+class PRCombiner extends Combiner[PRMessage, Double] with Serializable {
+ def createCombiner(msg: PRMessage): Double =
+ msg.value
+ def mergeMsg(combiner: Double, msg: PRMessage): Double =
+ combiner + msg.value
+ def mergeCombiners(a: Double, b: Double): Double =
+ a + b
+class PRVertex() extends Vertex with Serializable {
+ var value: Double = _
+ var outEdges: Array[String] = _
+ var active: Boolean = _
+ def this(value: Double, outEdges: Array[String], active: Boolean = true) {
+ this()
+ this.value = value
+ this.outEdges = outEdges
+ this.active = active
+ }
+ override def toString(): String = {
+ "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
+ }
+class PRMessage() extends Message[String] with Serializable {
+ var targetId: String = _
+ var value: Double = _
+ def this(targetId: String, value: Double) {
+ this()
+ this.targetId = targetId
+ this.value = value
+ }
+class PRKryoRegistrator extends KryoRegistrator {
+ def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[PRVertex])
+ kryo.register(classOf[PRMessage])
+ }
+class CustomPartitioner(partitions: Int) extends Partitioner {
+ def numPartitions = partitions
+ def getPartition(key: Any): Int = {
+ val hash = key match {
+ case k: Long => (k & 0x00000000FFFFFFFFL).toInt
+ case _ => key.hashCode
+ }
+ val mod = key.hashCode % partitions
+ if (mod < 0) mod + partitions else mod
+ }
+ override def equals(other: Any): Boolean = other match {
+ case c: CustomPartitioner =>
+ c.numPartitions == numPartitions
+ case _ => false
+ }
+package org.apache.spark.examples.bagel
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.bagel
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.bagel._
+import org.apache.spark.bagel.Bagel._
+import scala.xml.{XML,NodeSeq}
+ * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
+ * files from there, which contains one line per wiki article in a tab-separated format
+ * (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
+ */
+object WikipediaPageRank {
+ def main(args: Array[String]) {
+ if (args.length < 5) {
+ System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
+ System.exit(-1)
+ }
+ System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
+ val inputFile = args(0)
+ val threshold = args(1).toDouble
+ val numPartitions = args(2).toInt
+ val host = args(3)
+ val usePartitioner = args(4).toBoolean
+ val sc = new SparkContext(host, "WikipediaPageRank")
+ // Parse the Wikipedia page data into a graph
+ val input = sc.textFile(inputFile)
+ println("Counting vertices...")
+ val numVertices = input.count()
+ println("Done counting vertices.")
+ println("Parsing input file...")
+ var vertices = input.map(line => {
+ val fields = line.split("\t")
+ val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
+ val links =
+ if (body == "\\N")
+ NodeSeq.Empty
+ else
+ try {
+ XML.loadString(body) \\ "link" \ "target"
+ } catch {
+ case e: org.xml.sax.SAXParseException =>
+ System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ NodeSeq.Empty
+ }
+ val outEdges = links.map(link => new String(link.text)).toArray
+ val id = new String(title)
+ (id, new PRVertex(1.0 / numVertices, outEdges))
+ })
+ if (usePartitioner)
+ vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
+ else
+ vertices = vertices.cache
+ println("Done parsing input file.")
+ // Do the computation
+ val epsilon = 0.01 / numVertices
+ val messages = sc.parallelize(Array[(String, PRMessage)]())
+ val utils = new PageRankUtils
+ val result =
+ Bagel.run(
+ sc, vertices, messages, combiner = new PRCombiner(),
+ numPartitions = numPartitions)(
+ utils.computeWithCombiner(numVertices, epsilon))
+ // Print the result
+ System.err.println("Articles with PageRank >= "+threshold+":")
+ val top =
+ (result
+ .filter { case (id, vertex) => vertex.value >= threshold }
+ .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
+ .collect.mkString)
+ println(top)
+ }
+package org.apache.spark.examples.bagel
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.bagel
+import org.apache.spark._
+import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
+import org.apache.spark.SparkContext._
+import org.apache.spark.bagel._
+import org.apache.spark.bagel.Bagel._
+import scala.xml.{XML,NodeSeq}
+import scala.collection.mutable.ArrayBuffer
+import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
+import java.nio.ByteBuffer
+object WikipediaPageRankStandalone {
+ def main(args: Array[String]) {
+ if (args.length < 5) {
+ System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
+ System.exit(-1)
+ }
+ System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer")
+ val inputFile = args(0)
+ val threshold = args(1).toDouble
+ val numIterations = args(2).toInt
+ val host = args(3)
+ val usePartitioner = args(4).toBoolean
+ val sc = new SparkContext(host, "WikipediaPageRankStandalone")
+ val input = sc.textFile(inputFile)
+ val partitioner = new HashPartitioner(sc.defaultParallelism)
+ val links =
+ if (usePartitioner)
+ input.map(parseArticle _).partitionBy(partitioner).cache()
+ else
+ input.map(parseArticle _).cache()
+ val n = links.count()
+ val defaultRank = 1.0 / n
+ val a = 0.15
+ // Do the computation
+ val startTime = System.currentTimeMillis
+ val ranks =
+ pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
+ // Print the result
+ System.err.println("Articles with PageRank >= "+threshold+":")
+ val top =
+ (ranks
+ .filter { case (id, rank) => rank >= threshold }
+ .map { case (id, rank) => "%s\t%s\n".format(id, rank) }
+ .collect().mkString)
+ println(top)
+ val time = (System.currentTimeMillis - startTime) / 1000.0
+ println("Completed %d iterations in %f seconds: %f seconds per iteration"
+ .format(numIterations, time, time / numIterations))
+ System.exit(0)
+ }
+ def parseArticle(line: String): (String, Array[String]) = {
+ val fields = line.split("\t")
+ val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
+ val id = new String(title)
+ val links =
+ if (body == "\\N")
+ NodeSeq.Empty
+ else
+ try {
+ XML.loadString(body) \\ "link" \ "target"
+ } catch {
+ case e: org.xml.sax.SAXParseException =>
+ System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ NodeSeq.Empty
+ }
+ val outEdges = links.map(link => new String(link.text)).toArray
+ (id, outEdges)
+ }
+ def pageRank(
+ links: RDD[(String, Array[String])],
+ numIterations: Int,
+ defaultRank: Double,
+ a: Double,
+ n: Long,
+ partitioner: Partitioner,
+ usePartitioner: Boolean,
+ numPartitions: Int
+ ): RDD[(String, Double)] = {
+ var ranks = links.mapValues { edges => defaultRank }
+ for (i <- 1 to numIterations) {
+ val contribs = links.groupWith(ranks).flatMap {
+ case (id, (linksWrapper, rankWrapper)) =>
+ if (linksWrapper.length > 0) {
+ if (rankWrapper.length > 0) {
+ linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
+ } else {
+ linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
+ }
+ } else {
+ Array[(String, Double)]()
+ }
+ }
+ ranks = (contribs.combineByKey((x: Double) => x,
+ (x: Double, y: Double) => x + y,
+ (x: Double, y: Double) => x + y,
+ partitioner)
+ .mapValues(sum => a/n + (1-a)*sum))
+ }
+ ranks
+ }
+class WPRSerializer extends org.apache.spark.serializer.Serializer {
+ def newInstance(): SerializerInstance = new WPRSerializerInstance()
+class WPRSerializerInstance extends SerializerInstance {
+ def serialize[T](t: T): ByteBuffer = {
+ throw new UnsupportedOperationException()
+ }
+ def deserialize[T](bytes: ByteBuffer): T = {
+ throw new UnsupportedOperationException()
+ }
+ def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+ throw new UnsupportedOperationException()
+ }
+ def serializeStream(s: OutputStream): SerializationStream = {
+ new WPRSerializationStream(s)
+ }
+ def deserializeStream(s: InputStream): DeserializationStream = {
+ new WPRDeserializationStream(s)
+ }
+class WPRSerializationStream(os: OutputStream) extends SerializationStream {
+ val dos = new DataOutputStream(os)
+ def writeObject[T](t: T): SerializationStream = t match {
+ case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
+ case links: Array[String] => {
+ dos.writeInt(0) // links
+ dos.writeUTF(id)
+ dos.writeInt(links.length)
+ for (link <- links) {
+ dos.writeUTF(link)
+ }
+ this
+ }
+ case rank: Double => {
+ dos.writeInt(1) // rank
+ dos.writeUTF(id)
+ dos.writeDouble(rank)
+ this
+ }
+ }
+ case (id: String, rank: Double) => {
+ dos.writeInt(2) // rank without wrapper
+ dos.writeUTF(id)
+ dos.writeDouble(rank)
+ this
+ }
+ }
+ def flush() { dos.flush() }
+ def close() { dos.close() }
+class WPRDeserializationStream(is: InputStream) extends DeserializationStream {
+ val dis = new DataInputStream(is)
+ def readObject[T](): T = {
+ val typeId = dis.readInt()
+ typeId match {
+ case 0 => {
+ val id = dis.readUTF()
+ val numLinks = dis.readInt()
+ val links = new Array[String](numLinks)
+ for (i <- 0 until numLinks) {
+ val link = dis.readUTF()
+ links(i) = link
+ }
+ (id, ArrayBuffer(links)).asInstanceOf[T]
+ }
+ case 1 => {
+ val id = dis.readUTF()
+ val rank = dis.readDouble()
+ (id, ArrayBuffer(rank)).asInstanceOf[T]
+ }
+ case 2 => {
+ val id = dis.readUTF()
+ val rank = dis.readDouble()
+ (id, rank).asInstanceOf[T]
+ }
+ }
+ }
+ def close() { dis.close() }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import scala.collection.mutable.LinkedList
+import scala.util.Random
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.receivers.Receiver
+import org.apache.spark.util.AkkaUtils
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+ val strings: Array[String] = Array("words ", "may ", "count ")
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+ def receive: Receive = {
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+ }
+ * A sample actor as receiver, is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data.
+ *
+ * @see [[org.apache.spark.streaming.examples.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+extends Actor with Receiver {
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+ def receive = {
+ case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ }
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+ println("Feeder started as:" + feeder)
+ actorSystem.awaitTermination();
+ }
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ * Usage: ActorWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./run-example spark.streaming.examples.FeederActor 9999`
+ * and then run the example
+ * `$ ./run-example spark.streaming.examples.ActorWordCount local[2] 9999`
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <hostname> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ val Seq(master, host, port) = args.toSeq
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+ //compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.util.IntParam
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println(
+ "Usage: FlumeEventCount <master> <host> <port>")
+ System.exit(1)
+ }
+ val Array(master, host, IntParam(port)) = args
+ val batchInterval = Milliseconds(2000)
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Create a flume stream
+ val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+ * Counts words in new text files created in the given directory
+ * Usage: HdfsWordCount <master> <directory>
+ * <master> is the Spark master URL.
+ * <directory> is the directory that Spark Streaming will use to find and read new text files.
+ *
+ * To run this on your local machine on directory `localdir`, run this example
+ * `$ ./run-example spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * Then create a text file in `localdir` and the words in the file will get counted.
+ */
+object HdfsWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: HdfsWordCount <master> <directory>")
+ System.exit(1)
+ }
+ // Create the context
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Create the FileInputDStream on the directory and use the
+ // stream to count words in new files created
+ val lines = ssc.textFileStream(args(1))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import java.util.Properties
+import kafka.message.Message
+import kafka.producer.SyncProducerConfig
+import kafka.producer._
+import org.apache.spark.SparkContext
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.RawTextHelper._
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./run-example spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ */
+object KafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
+ System.exit(1)
+ }
+ val Array(master, zkQuorum, group, topics, numThreads) = args
+ val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ ssc.checkpoint("checkpoint")
+ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = ssc.kafkaStream(zkQuorum, group, topicpMap)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+ ssc.start()
+ }
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+ val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
+ // Zookeper connection properties
+ val props = new Properties()
+ props.put("zk.connect", zkQuorum)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+ // Send some messages
+ while(true) {
+ val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
+ }.toArray
+ println(messages.mkString(","))
+ val data = new ProducerData[String, String](topic, messages)
+ producer.send(data)
+ Thread.sleep(100)
+ }
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ */
+object NetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.RDD
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import scala.collection.mutable.SynchronizedQueue
+object QueueStream {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: QueueStream <master>")
+ System.exit(1)
+ }
+ // Create the context
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ val rddQueue = new SynchronizedQueue[RDD[Int]]()
+ // Create the QueueInputDStream and use it do some processing
+ val inputStream = ssc.queueStream(rddQueue)
+ val mappedStream = inputStream.map(x => (x % 10, 1))
+ val reducedStream = mappedStream.reduceByKey(_ + _)
+ reducedStream.print()
+ ssc.start()
+ // Create and push some RDDs into
+ for (i <- 1 to 30) {
+ rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ Thread.sleep(1000)
+ }
+ ssc.stop()
+ System.exit(0)
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.util.IntParam
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.util.RawTextHelper
+ * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
+ * lines have the word 'the' in them. This is useful for benchmarking purposes. This
+ * will only work with spark.streaming.util.RawTextSender running on all worker nodes
+ * and with Spark using Kryo serialization (set Java property "spark.serializer" to
+ * "org.apache.spark.KryoSerializer").
+ * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>
+ * <master> is the Spark master URL
+ * <numStream> is the number rawNetworkStreams, which should be same as number
+ * of work nodes in the cluster
+ * <host> is "localhost".
+ * <port> is the port on which RawTextSender is running in the worker nodes.
+ * <batchMillise> is the Spark Streaming batch duration in milliseconds.
+ */
+object RawNetworkGrep {
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+ val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
+ // Create the context
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ RawTextHelper.warmUp(ssc.sparkContext)
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("the")).count().foreach(r =>
+ println("Grep count: " + r.collect().mkString))
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: StatefulNetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run-example spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
+ */
+object StatefulNetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ val currentCount = values.foldLeft(0)(_ + _)
+ val previousCount = state.getOrElse(0)
+ Some(currentCount + previousCount)
+ }
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ ssc.checkpoint(".")
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordDstream = words.map(x => (x, 1))
+ // Update the cumulative count using updateStateByKey
+ // This will give a Dstream made of state (which is the cumulative count of the words)
+ val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
+ stateDstream.print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.storage.StorageLevel
+import com.twitter.algebird._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.SparkContext._
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
+ * <br>
+ * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
+ * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
+ * the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
+ * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
+ * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
+ * percentage of the overall total count.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ */
+object TwitterAlgebirdCMS {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdCMS <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+ // CMS parameters
+ val DELTA = 1E-3
+ val EPS = 0.01
+ val SEED = 1
+ val PERC = 0.001
+ // K highest frequency elements to take
+ val TOPK = 10
+ val (master, filters) = (args.head, args.tail)
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val users = stream.map(status => status.getUser.getId)
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+ var globalCMS = cms.zero
+ val mm = new MapMonoid[Long, Int]()
+ var globalExact = Map[Long, Int]()
+ val approxTopUsers = users.mapPartitions(ids => {
+ ids.map(id => cms.create(id))
+ }).reduce(_ ++ _)
+ val exactTopUsers = users.map(id => (id, 1))
+ .reduceByKey((a, b) => a + b)
+ approxTopUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ val partialTopK = partial.heavyHitters.map(id =>
+ (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ globalCMS ++= partial
+ val globalTopK = globalCMS.heavyHitters.map(id =>
+ (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
+ partialTopK.mkString("[", ",", "]")))
+ println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
+ globalTopK.mkString("[", ",", "]")))
+ }
+ })
+ exactTopUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partialMap = rdd.collect().toMap
+ val partialTopK = rdd.map(
+ {case (id, count) => (count, id)})
+ .sortByKey(ascending = false).take(TOPK)
+ globalExact = mm.plus(globalExact.toMap, partialMap)
+ val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
+ println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
+ }
+ })
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+import org.apache.spark.streaming.dstream.TwitterInputDStream
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
+ * <p>
+ * <p>
+ * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * blog post</a> and this
+ * <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
+ * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
+ * the cardinality of a data stream, i.e. the number of unique elements.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
+ */
+object TwitterAlgebirdHLL {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdHLL <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+ /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+ val BIT_SIZE = 12
+ val (master, filters) = (args.head, args.tail)
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val users = stream.map(status => status.getUser.getId)
+ val hll = new HyperLogLogMonoid(BIT_SIZE)
+ var globalHll = hll.zero
+ var userSet: Set[Long] = Set()
+ val approxUsers = users.mapPartitions(ids => {
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+ approxUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ globalHll += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
+ }
+ })
+ exactUsers.foreach(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
+ }
+ })
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
+import org.apache.spark.SparkContext._
+ * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
+ * stream. The stream is instantiated with credentials and optionally filters supplied by the
+ * command line arguments.
+ *
+ */
+object TwitterPopularTags {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterPopularTags <master>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+ val (master, filters) = (args.head, args.tail)
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val stream = ssc.twitterStream(None, filters)
+ val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
+ val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
+ .map{case (topic, count) => (count, topic)}
+ .transform(_.sortByKey(false))
+ val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
+ .map{case (topic, count) => (count, topic)}
+ .transform(_.sortByKey(false))
+ // Print popular hashtags
+ topCounts60.foreach(rdd => {
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
+ })
+ topCounts10.foreach(rdd => {
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
+ })
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples
+import akka.actor.ActorSystem
+import akka.actor.actorRef2Scala
+import akka.zeromq._
+import org.apache.spark.streaming.{ Seconds, StreamingContext }
+import org.apache.spark.streaming.StreamingContext._
+import akka.zeromq.Subscribe
+ * A simple publisher for demonstration purposes, repeatedly publishes random Messages
+ * every one second.
+ */
+object SimpleZeroMQPublisher {
+ def main(args: Array[String]) = {
+ if (args.length < 2) {
+ System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+ System.exit(1)
+ }
+ val Seq(url, topic) = args.toSeq
+ val acs: ActorSystem = ActorSystem()
+ val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+ val messages: Array[String] = Array("words ", "may ", "count ")
+ while (true) {
+ Thread.sleep(1000)
+ pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+ }
+ acs.awaitTermination()
+ }
+ * A sample wordcount with ZeroMQStream stream
+ *
+ * To work with zeroMQ, some native libraries have to be installed.
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ *
+ * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./run-example spark.streaming.examples.SimpleZeroMQPublisher tcp:// foo.bar`
+ * and run the example as
+ * `$ ./run-example spark.streaming.examples.ZeroMQWordCount local[2] tcp:// foo`
+ */
+object ZeroMQWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ val Seq(master, url, topic) = args.toSeq
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+ //For this stream, a zeroMQ publisher should be running.
+ val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+package org.apache.spark.streaming.examples.clickstream
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples.clickstream
+import java.net.{InetAddress,ServerSocket,Socket,SocketException}
+import java.io.{InputStreamReader, BufferedReader, PrintWriter}
+import util.Random
+/** Represents a page view on a website with associated dimension data.*/
+class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
+ override def toString() : String = {
+ "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
+ }
+object PageView {
+ def fromString(in : String) : PageView = {
+ val parts = in.split("\t")
+ new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
+ }
+/** Generates streaming events to simulate page views on a website.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * */
+object PageViewGenerator {
+ val pages = Map("http://foo.com/" -> .7,
+ "http://foo.com/news" -> 0.2,
+ "http://foo.com/contact" -> .1)
+ val httpStatus = Map(200 -> .95,
+ 404 -> .05)
+ val userZipCode = Map(94709 -> .5,
+ 94117 -> .5)
+ val userID = Map((1 to 100).map(_ -> .01):_*)
+ def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+ val rand = new Random().nextDouble()
+ var total = 0.0
+ for ((item, prob) <- inputMap) {
+ total = total + prob
+ if (total > rand) {
+ return item
+ }
+ }
+ return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ }
+ def getNextClickEvent() : String = {
+ val id = pickFromDistribution(userID)
+ val page = pickFromDistribution(pages)
+ val status = pickFromDistribution(httpStatus)
+ val zipCode = pickFromDistribution(userZipCode)
+ new PageView(page, status, zipCode, id).toString()
+ }
+ def main(args : Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
+ System.exit(1)
+ }
+ val port = args(0).toInt
+ val viewsPerSecond = args(1).toFloat
+ val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
+ val listener = new ServerSocket(port)
+ println("Listening on port: " + port)
+ while (true) {
+ val socket = listener.accept()
+ new Thread() {
+ override def run = {
+ println("Got client connected from: " + socket.getInetAddress)
+ val out = new PrintWriter(socket.getOutputStream(), true)
+ while (true) {
+ Thread.sleep(sleepDelayMs)
+ out.write(getNextClickEvent())
+ out.flush()
+ }
+ socket.close()
+ }
+ }.start()
+ }
+ }
+package org.apache.spark.streaming.examples.clickstream
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.examples.clickstream
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.SparkContext._
+/** Analyses a streaming dataset of web page views. This class demonstrates several types of
+ * operators available in Spark streaming.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ */
+object PageViewStream {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: PageViewStream <metric> <host> <port>")
+ System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
+ " errorRatePerZipCode, activeUserCount, popularUsersSeen")
+ System.exit(1)
+ }
+ val metric = args(0)
+ val host = args(1)
+ val port = args(2).toInt
+ // Create the context
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ // Create a NetworkInputDStream on target host:port and convert each line to a PageView
+ val pageViews = ssc.socketTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
+ // Return a count of views per URL seen in each batch
+ val pageCounts = pageViews.map(view => view.url).countByValue()
+ // Return a sliding window of page views per URL in the last ten seconds
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
+ // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
+ val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
+ .map(view => ((view.zipCode, view.status)))
+ .groupByKey()
+ val errorRatePerZipCode = statusesPerZipCode.map{
+ case(zip, statuses) =>
+ val normalCount = statuses.filter(_ == 200).size
+ val errorCount = statuses.size - normalCount
+ val errorRatio = errorCount.toFloat / statuses.size
+ if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
+ else {"%s: %s".format(zip, errorRatio)}
+ }
+ // Return the number unique users in last 15 seconds
+ val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
+ .map(view => (view.userID, 1))
+ .groupByKey()
+ .count()
+ .map("Unique active users: " + _)
+ // An external dataset we want to join to this stream
+ val userList = ssc.sparkContext.parallelize(
+ Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
+ metric match {
+ case "pageCounts" => pageCounts.print()
+ case "slidingPageCounts" => slidingPageCounts.print()
+ case "errorRatePerZipCode" => errorRatePerZipCode.print()
+ case "activeUserCount" => activeUserCount.print()
+ case "popularUsersSeen" =>
+ // Look for users in our existing dataset and print it out if we have a match
+ pageViews.map(view => (view.userID, 1))
+ .foreach((rdd, time) => rdd.join(userList)
+ .map(_._2._2)
+ .take(10)
+ .foreach(u => println("Saw user %s at time %s".format(u, time))))
+ case _ => println("Invalid metric entered: " + metric)
+ }
+ ssc.start()
+ }