aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
commitc89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 (patch)
tree01f9f42f30a4aa2f73cb200c89a71254bf74d80e /examples/src/main/scala
parent48c7e373c62b2e8cf48157ceb0d92c38c3a40652 (diff)
parent78ffe164b33c6b11a2e511442605acd2f795a1b5 (diff)
downloadspark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.gz
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.bz2
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.zip
Merge branch 'master' into streaming
Conflicts: .gitignore
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala10
-rw-r--r--examples/src/main/scala/spark/examples/CassandraTest.scala196
-rw-r--r--examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/HBaseTest.scala35
-rw-r--r--examples/src/main/scala/spark/examples/HdfsTest.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalALS.scala4
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala141
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/spark/examples/MultiBroadcastTest.scala22
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala7
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala15
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala62
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala14
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala7
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkPi.scala4
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/QueueStream.scala7
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala5
31 files changed, 441 insertions, 155 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
index 230097c7db..ba59be1687 100644
--- a/examples/src/main/scala/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -9,19 +9,21 @@ object BroadcastTest {
System.exit(1)
}
- val spark = new SparkContext(args(0), "Broadcast Test")
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
+ }
for (i <- 0 until 2) {
println("Iteration " + i)
println("===========")
- val barr1 = spark.broadcast(arr1)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
}
diff --git a/examples/src/main/scala/spark/examples/CassandraTest.scala b/examples/src/main/scala/spark/examples/CassandraTest.scala
new file mode 100644
index 0000000000..0fe1833e83
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/CassandraTest.scala
@@ -0,0 +1,196 @@
+package spark.examples
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
+import org.apache.cassandra.thrift._
+import spark.SparkContext
+import spark.SparkContext._
+import java.nio.ByteBuffer
+import java.util.SortedMap
+import org.apache.cassandra.db.IColumn
+import org.apache.cassandra.utils.ByteBufferUtil
+import scala.collection.JavaConversions._
+
+
+/*
+ * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
+ * support for Hadoop.
+ *
+ * To run this example, run this file with the following command params -
+ * <spark_master> <cassandra_node> <cassandra_port>
+ *
+ * So if you want to run this on localhost this will be,
+ * local[3] localhost 9160
+ *
+ * The example makes some assumptions:
+ * 1. You have already created a keyspace called casDemo and it has a column family named Words
+ * 2. There are column family has a column named "para" which has test content.
+ *
+ * You can create the content by running the following script at the bottom of this file with
+ * cassandra-cli.
+ *
+ */
+object CassandraTest {
+
+ def main(args: Array[String]) {
+
+ // Get a SparkContext
+ val sc = new SparkContext(args(0), "casDemo")
+
+ // Build the job configuration with ConfigHelper provided by Cassandra
+ val job = new Job()
+ job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
+
+ val host: String = args(1)
+ val port: String = args(2)
+
+ ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
+
+ val predicate = new SlicePredicate()
+ val sliceRange = new SliceRange()
+ sliceRange.setStart(Array.empty[Byte])
+ sliceRange.setFinish(Array.empty[Byte])
+ predicate.setSlice_range(sliceRange)
+ ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate)
+
+ ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
+ // Make a new Hadoop RDD
+ val casRdd = sc.newAPIHadoopRDD(
+ job.getConfiguration(),
+ classOf[ColumnFamilyInputFormat],
+ classOf[ByteBuffer],
+ classOf[SortedMap[ByteBuffer, IColumn]])
+
+ // Let us first get all the paragraphs from the retrieved rows
+ val paraRdd = casRdd.map {
+ case (key, value) => {
+ ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
+ }
+ }
+
+ // Lets get the word count in paras
+ val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
+
+ counts.collect().foreach {
+ case (word, count) => println(word + ":" + count)
+ }
+
+ counts.map {
+ case (word, count) => {
+ val colWord = new org.apache.cassandra.thrift.Column()
+ colWord.setName(ByteBufferUtil.bytes("word"))
+ colWord.setValue(ByteBufferUtil.bytes(word))
+ colWord.setTimestamp(System.currentTimeMillis)
+
+ val colCount = new org.apache.cassandra.thrift.Column()
+ colCount.setName(ByteBufferUtil.bytes("wcount"))
+ colCount.setValue(ByteBufferUtil.bytes(count.toLong))
+ colCount.setTimestamp(System.currentTimeMillis)
+
+ val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
+
+ val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
+ mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(0).column_or_supercolumn.setColumn(colWord)
+ mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(1).column_or_supercolumn.setColumn(colCount)
+ (outputkey, mutations)
+ }
+ }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
+ classOf[ColumnFamilyOutputFormat], job.getConfiguration)
+ }
+}
+
+/*
+create keyspace casDemo;
+use casDemo;
+
+create column family WordCount with comparator = UTF8Type;
+update column family WordCount with column_metadata =
+ [{column_name: word, validation_class: UTF8Type},
+ {column_name: wcount, validation_class: LongType}];
+
+create column family Words with comparator = UTF8Type;
+update column family Words with column_metadata =
+ [{column_name: book, validation_class: UTF8Type},
+ {column_name: para, validation_class: UTF8Type}];
+
+assume Words keys as utf8;
+
+set Words['3musk001']['book'] = 'The Three Musketeers';
+set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
+ town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
+
+set Words['3musk002']['book'] = 'The Three Musketeers';
+set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
+ some city or other registering in its archives an event of this kind. There were
+ nobles, who made war against each other; there was the king, who made
+ war against the cardinal; there was Spain, which made war against the
+ king. Then, in addition to these concealed or public, secret or open
+ wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+ who made war upon everybody. The citizens always took up arms readily
+ against thieves, wolves or scoundrels, often against nobles or
+ Huguenots, sometimes against the king, but never against cardinal or
+ Spain. It resulted, then, from this habit that on the said first Monday
+ of April, 1625, the citizens, on hearing the clamor, and seeing neither
+ the red-and-yellow standard nor the livery of the Duc de Richelieu,
+ rushed toward the hostel of the Jolly Miller. When arrived there, the
+ cause of the hubbub was apparent to all';
+
+set Words['3musk003']['book'] = 'The Three Musketeers';
+set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
+ large the sum may be; but you ought also to endeavor to perfect yourself in
+ the exercises becoming a gentleman. I will write a letter today to the
+ Director of the Royal Academy, and tomorrow he will admit you without
+ any expense to yourself. Do not refuse this little service. Our
+ best-born and richest gentlemen sometimes solicit it without being able
+ to obtain it. You will learn horsemanship, swordsmanship in all its
+ branches, and dancing. You will make some desirable acquaintances; and
+ from time to time you can call upon me, just to tell me how you are
+ getting on, and to say whether I can be of further service to you.';
+
+
+set Words['thelostworld001']['book'] = 'The Lost World';
+set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
+ against the red curtain. How beautiful she was! And yet how aloof! We had been
+ friends, quite good friends; but never could I get beyond the same
+ comradeship which I might have established with one of my
+ fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+ and perfectly unsexual. My instincts are all against a woman being too
+ frank and at her ease with me. It is no compliment to a man. Where
+ the real sex feeling begins, timidity and distrust are its companions,
+ heritage from old wicked days when love and violence went often hand in
+ hand. The bent head, the averted eye, the faltering voice, the wincing
+ figure--these, and not the unshrinking gaze and frank reply, are the
+ true signals of passion. Even in my short life I had learned as much
+ as that--or had inherited it in that race memory which we call instinct.';
+
+set Words['thelostworld002']['book'] = 'The Lost World';
+set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
+ red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
+ the real boss; but he lived in the rarefied atmosphere of some Olympian
+ height from which he could distinguish nothing smaller than an
+ international crisis or a split in the Cabinet. Sometimes we saw him
+ passing in lonely majesty to his inner sanctum, with his eyes staring
+ vaguely and his mind hovering over the Balkans or the Persian Gulf. He
+ was above and beyond us. But McArdle was his first lieutenant, and it
+ was he that we knew. The old man nodded as I entered the room, and he
+ pushed his spectacles far up on his bald forehead.';
+
+*/
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
index c89f3dac0c..21a90f2e5a 100644
--- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -9,9 +9,10 @@ object ExceptionHandlingTest {
System.exit(1)
}
- val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
- if (Math.random > 0.75)
+ if (math.random > 0.75)
throw new Exception("Testing exception handling")
}
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
index 86dfba3a40..a6603653f1 100644
--- a/examples/src/main/scala/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -9,14 +9,15 @@ object GroupByTest {
if (args.length == 0) {
System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
- }
+ }
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala
new file mode 100644
index 0000000000..6e910154d4
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/HBaseTest.scala
@@ -0,0 +1,35 @@
+package spark.examples
+
+import spark._
+import spark.rdd.NewHadoopRDD
+import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+
+object HBaseTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HBaseTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ val conf = HBaseConfiguration.create()
+
+ // Other options for configuring scan behavior are available. More information available at
+ // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
+ conf.set(TableInputFormat.INPUT_TABLE, args(1))
+
+ // Initialize hBase table if necessary
+ val admin = new HBaseAdmin(conf)
+ if(!admin.isTableAvailable(args(1))) {
+ val tableDesc = new HTableDescriptor(args(1))
+ admin.createTable(tableDesc)
+ }
+
+ val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
+ classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
+ classOf[org.apache.hadoop.hbase.client.Result])
+
+ hBaseRDD.count()
+
+ System.exit(0)
+ }
+} \ No newline at end of file
diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala
index 7a4530609d..dd61c467f7 100644
--- a/examples/src/main/scala/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/spark/examples/HdfsTest.scala
@@ -4,7 +4,8 @@ import spark._
object HdfsTest {
def main(args: Array[String]) {
- val sc = new SparkContext(args(0), "HdfsTest")
+ val sc = new SparkContext(args(0), "HdfsTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala
index 10e03359c9..2de810e062 100644
--- a/examples/src/main/scala/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/spark/examples/LocalALS.scala
@@ -1,11 +1,13 @@
package spark.examples
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
+/**
+ * Alternating least squares matrix factorization.
+ */
object LocalALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index b442c604cd..4849f216fb 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -6,74 +6,77 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object LocalKMeans {
- val N = 1000
- val R = 1000 // Scaling factor
- val D = 10
- val K = 10
- val convergeDist = 0.001
- val rand = new Random(42)
-
- def generateData = {
- def generatePoint(i: Int) = {
- Vector(D, _ => rand.nextDouble * R)
- }
- Array.tabulate(N)(generatePoint)
- }
-
- def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
- var index = 0
- var bestIndex = 0
- var closest = Double.PositiveInfinity
-
- for (i <- 1 to centers.size) {
- val vCurr = centers.get(i).get
- val tempDist = p.squaredDist(vCurr)
- if (tempDist < closest) {
- closest = tempDist
- bestIndex = i
- }
- }
-
- return bestIndex
- }
-
- def main(args: Array[String]) {
- val data = generateData
- var points = new HashSet[Vector]
- var kPoints = new HashMap[Int, Vector]
- var tempDist = 1.0
-
- while (points.size < K) {
- points.add(data(rand.nextInt(N)))
- }
-
- val iter = points.iterator
- for (i <- 1 to points.size) {
- kPoints.put(i, iter.next())
- }
-
- println("Initial centers: " + kPoints)
-
- while(tempDist > convergeDist) {
- var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
-
- var mappings = closest.groupBy[Int] (x => x._1)
-
- var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
-
- var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
-
- tempDist = 0.0
- for (mapping <- newPoints) {
- tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
- }
-
- for (newP <- newPoints) {
- kPoints.put(newP._1, newP._2)
- }
- }
-
- println("Final centers: " + kPoints)
- }
+ val N = 1000
+ val R = 1000 // Scaling factor
+ val D = 10
+ val K = 10
+ val convergeDist = 0.001
+ val rand = new Random(42)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ Vector(D, _ => rand.nextDouble * R)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+
+ def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+
+ for (i <- 1 to centers.size) {
+ val vCurr = centers.get(i).get
+ val tempDist = p.squaredDist(vCurr)
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+
+ return bestIndex
+ }
+
+ def main(args: Array[String]) {
+ val data = generateData
+ var points = new HashSet[Vector]
+ var kPoints = new HashMap[Int, Vector]
+ var tempDist = 1.0
+
+ while (points.size < K) {
+ points.add(data(rand.nextInt(N)))
+ }
+
+ val iter = points.iterator
+ for (i <- 1 to points.size) {
+ kPoints.put(i, iter.next())
+ }
+
+ println("Initial centers: " + kPoints)
+
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+
+ var mappings = closest.groupBy[Int] (x => x._1)
+
+ var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+
+ var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+
+ tempDist = 0.0
+ for (mapping <- newPoints) {
+ tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+ }
+
+ for (newP <- newPoints) {
+ kPoints.put(newP._1, newP._2)
+ }
+ }
+
+ println("Final centers: " + kPoints)
+ }
}
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index 9553162004..cd73f553d6 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -3,6 +3,9 @@ package spark.examples
import java.util.Random
import spark.util.Vector
+/**
+ * Logistic regression based classification.
+ */
object LocalLR {
val N = 10000 // Number of data points
val D = 10 // Number of dimensions
diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala
index 5330b8da94..6497596d35 100644
--- a/examples/src/main/scala/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/spark/examples/LogQuery.scala
@@ -26,7 +26,9 @@ object LogQuery {
System.err.println("Usage: LogQuery <master> [logFile]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Log Query")
+
+ val sc = new SparkContext(args(0), "Log Query",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val dataSet =
if (args.length == 2) sc.textFile(args(1))
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
index 83ae014e94..a0aaf60918 100644
--- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -8,25 +8,29 @@ object MultiBroadcastTest {
System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
System.exit(1)
}
-
- val spark = new SparkContext(args(0), "Broadcast Test")
+
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
-
+ }
+
var arr2 = new Array[Int](num)
- for (i <- 0 until arr2.length)
+ for (i <- 0 until arr2.length) {
arr2(i) = i
+ }
- val barr1 = spark.broadcast(arr1)
- val barr2 = spark.broadcast(arr2)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ val barr2 = sc.broadcast(arr2)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size + barr2.value.size)
}
-
+
System.exit(0)
}
}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
index 50b3a263b4..461b84a2c6 100644
--- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -11,15 +11,16 @@ object SimpleSkewedGroupByTest {
"[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
System.exit(1)
}
-
+
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
var ratio = if (args.length > 5) args(5).toInt else 5.0
- val sc = new SparkContext(args(0), "GroupBy Test")
-
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var result = new Array[(Int, Array[Byte])](numKVPairs)
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
index d2117a263e..435675f9de 100644
--- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -10,19 +10,20 @@ object SkewedGroupByTest {
System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
-
+
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
-
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
// map output sizes lineraly increase from the 1st to the last
- numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+ numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
@@ -31,11 +32,11 @@ object SkewedGroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
- }.cache
+ }.cache()
// Enforce that everything has been calculated and in cache
- pairs1.count
+ pairs1.count()
- println(pairs1.groupByKey(numReducers).count)
+ println(pairs1.groupByKey(numReducers).count())
System.exit(0)
}
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
index 5e01885dbb..8fb3b0fb2a 100644
--- a/examples/src/main/scala/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -1,14 +1,14 @@
package spark.examples
-import java.io.Serializable
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
import spark._
-import scala.Option
+/**
+ * Alternating least squares matrix factorization.
+ */
object SparkALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
@@ -70,30 +70,32 @@ object SparkALS {
}
def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
+ System.exit(1)
+ }
+
var host = ""
var slices = 0
- (0 to 5).map(i => {
- i match {
- case a if a < args.length => Some(args(a))
- case _ => None
- }
- }).toArray match {
- case Array(host_, m, u, f, iters, slices_) => {
- host = host_ getOrElse "local"
- M = (m getOrElse "100").toInt
- U = (u getOrElse "500").toInt
- F = (f getOrElse "10").toInt
- ITERATIONS = (iters getOrElse "5").toInt
- slices = (slices_ getOrElse "2").toInt
- }
- case _ => {
- System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]")
+ val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None)
+
+ options.toArray match {
+ case Array(host_, m, u, f, iters, slices_) =>
+ host = host_.get
+ M = m.getOrElse("100").toInt
+ U = u.getOrElse("500").toInt
+ F = f.getOrElse("10").toInt
+ ITERATIONS = iters.getOrElse("5").toInt
+ slices = slices_.getOrElse("2").toInt
+ case _ =>
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
System.exit(1)
- }
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
- val spark = new SparkContext(host, "SparkALS")
+
+ val sc = new SparkContext(host, "SparkALS",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val R = generateR()
@@ -102,19 +104,19 @@ object SparkALS {
var us = Array.fill(U)(factory1D.random(F))
// Iteratively update movies then users
- val Rc = spark.broadcast(R)
- var msc = spark.broadcast(ms)
- var usc = spark.broadcast(us)
+ val Rc = sc.broadcast(R)
+ var msb = sc.broadcast(ms)
+ var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
- ms = spark.parallelize(0 until M, slices)
- .map(i => update(i, msc.value(i), usc.value, Rc.value))
+ ms = sc.parallelize(0 until M, slices)
+ .map(i => update(i, msb.value(i), usb.value, Rc.value))
.toArray
- msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
- us = spark.parallelize(0 until U, slices)
- .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value)))
+ msb = sc.broadcast(ms) // Re-broadcast ms because it was updated
+ us = sc.parallelize(0 until U, slices)
+ .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value)))
.toArray
- usc = spark.broadcast(us) // Re-broadcast us because it was updated
+ usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
}
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index 5b2bc84d69..3d080a0257 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -4,7 +4,12 @@ import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
+import spark.deploy.SparkHadoopUtil
+import spark.scheduler.InputFormatInfo
+/**
+ * Logistic regression based classification.
+ */
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)
@@ -29,8 +34,13 @@ object SparkHdfsLR {
System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkHdfsLR")
- val lines = sc.textFile(args(1))
+ val inputPath = args(1)
+ val conf = SparkHadoopUtil.newConfiguration()
+ val sc = new SparkContext(args(0), "SparkHdfsLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
+ InputFormatInfo.computePreferredLocations(
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+ val lines = sc.textFile(inputPath)
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index 6375961390..4161c59fea 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -7,6 +7,9 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object SparkKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)
@@ -36,7 +39,8 @@ object SparkKMeans {
System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLocalKMeans")
+ val sc = new SparkContext(args(0), "SparkLocalKMeans",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
@@ -60,6 +64,7 @@ object SparkKMeans {
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
+ println("Finished iteration (delta = " + tempDist + ")")
}
println("Final centers:")
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
index aaaf062c8f..2f41aeb376 100644
--- a/examples/src/main/scala/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -5,6 +5,9 @@ import scala.math.exp
import spark.util.Vector
import spark._
+/**
+ * Logistic regression based classification.
+ */
object SparkLR {
val N = 10000 // Number of data points
val D = 10 // Numer of dimensions
@@ -28,7 +31,8 @@ object SparkLR {
System.err.println("Usage: SparkLR <master> [<slices>]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLR")
+ val sc = new SparkContext(args(0), "SparkLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala
index 2f226f1380..f598d2ff9c 100644
--- a/examples/src/main/scala/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/spark/examples/SparkPi.scala
@@ -4,13 +4,15 @@ import scala.math.random
import spark._
import SparkContext._
+/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: SparkPi <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkPi")
+ val spark = new SparkContext(args(0), "SparkPi",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
index 90bae011ad..911ae8f168 100644
--- a/examples/src/main/scala/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -9,7 +9,6 @@ import scala.collection.mutable
* Transitive closure on a graph.
*/
object SparkTC {
-
val numEdges = 200
val numVertices = 100
val rand = new Random(42)
@@ -29,7 +28,8 @@ object SparkTC {
System.err.println("Usage: SparkTC <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkTC")
+ val spark = new SparkContext(args(0), "SparkTC",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 76293fbb96..3b847fe603 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -131,7 +131,8 @@ object ActorWordCount {
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
/*
* Following is the use of actorStream to plug in custom actor as receiver
diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index 461929fba2..39c76fd98a 100644
--- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -30,7 +30,8 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
index 8530f5c175..9389f8a38d 100644
--- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -22,7 +22,8 @@ object HdfsWordCount {
}
// Create the context
- val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index e0c3555f21..9202e65e09 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -32,8 +32,8 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
- val sc = new SparkContext(master, "KafkaWordCount")
- val ssc = new StreamingContext(sc, Seconds(2))
+ val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 5ac6d19b34..704540c2bf 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -23,7 +23,8 @@ object NetworkWordCount {
}
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
index 2a265d021d..f450e21040 100644
--- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -15,7 +15,8 @@ object QueueStream {
}
// Create the context
- val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
@@ -30,10 +31,10 @@ object QueueStream {
// Create and push some RDDs into
for (i <- 1 to 30) {
- rddQueue += ssc.sc.makeRDD(1 to 1000, 10)
+ rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
Thread.sleep(1000)
}
ssc.stop()
System.exit(0)
}
-} \ No newline at end of file
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
index 66e709b7a3..175281e095 100644
--- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -31,10 +31,11 @@ object RawNetworkGrep {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context
- val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis))
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Warm up the JVMs on master and slave for JIT compilation to kick in
- RawTextHelper.warmUp(ssc.sc)
+ RawTextHelper.warmUp(ssc.sparkContext)
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 39a1a702ee..a9642100e3 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -43,12 +43,13 @@ object TwitterAlgebirdCMS {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
- val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 914fba4ca2..f3288bfb85 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -32,7 +32,8 @@ object TwitterAlgebirdHLL {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index fdb3a4c73c..9d4494c6f2 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -21,7 +21,8 @@ object TwitterPopularTags {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
index 5ed9b7cb76..74d0d338b7 100644
--- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -58,7 +58,8 @@ object ZeroMQWordCount {
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
@@ -70,4 +71,4 @@ object ZeroMQWordCount {
ssc.start()
}
-} \ No newline at end of file
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index fba72519a9..e226a4a73a 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -24,7 +24,8 @@ object PageViewStream {
val port = args(2).toInt
// Create the context
- val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.socketTextStream(host, port)
@@ -60,7 +61,7 @@ object PageViewStream {
.map("Unique active users: " + _)
// An external dataset we want to join to this stream
- val userList = ssc.sc.parallelize(
+ val userList = ssc.sparkContext.parallelize(
Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
metric match {