From 1db119a08f07b8707b901e92b03138b27e887844 Mon Sep 17 00:00:00 2001 From: seanm Date: Fri, 18 Jan 2013 20:22:23 -0700 Subject: kafka jar wasn't being included by run script --- run | 3 +++ 1 file changed, 3 insertions(+) (limited to 'run') diff --git a/run b/run index 2f61cb2a87..494f04c3ac 100755 --- a/run +++ b/run @@ -76,6 +76,9 @@ CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" +for jar in `find "$STREAMING_DIR/lib" -name '*jar'`; do + CLASSPATH+=":$jar" +done if [ -e "$FWDIR/lib_managed" ]; then for jar in `find "$FWDIR/lib_managed/jars" -name '*jar'`; do CLASSPATH+=":$jar" -- cgit v1.2.3 From 858784459f27da6b969022339dcda4cb9970de1b Mon Sep 17 00:00:00 2001 From: "haitao.yao" Date: Sat, 16 Feb 2013 14:42:06 +0800 Subject: support customized java options for master, worker, executor, repl shell --- run | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'run') diff --git a/run b/run index 82b1da005a..ed5fe3ba38 100755 --- a/run +++ b/run @@ -25,6 +25,26 @@ if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi + +# Add java opts for master, worker, executor. The opts maybe null +case "$1" in + 'spark.deploy.master.Master') + SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" + ;; + 'spark.deploy.worker.Worker') + SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" + ;; + 'spark.executor.StandaloneExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.executor.MesosExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.repl.Main') + SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" + ;; +esac + if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then if [ `command -v scala` ]; then RUNNER="scala" -- cgit v1.2.3 From 5ab37be9831e8a70b2502b14aed1c87cb002a189 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 24 Feb 2013 16:24:52 -0800 Subject: Fixed class paths and dependencies based on Matei's comments. --- docs/custom-streaming-receiver.md | 101 ++++++++++++++++++++++++++++++++++++ docs/plugin-custom-receiver.md | 101 ------------------------------------ docs/streaming-custom-receivers.md | 101 ++++++++++++++++++++++++++++++++++++ docs/streaming-programming-guide.md | 6 +-- pom.xml | 6 +-- project/SparkBuild.scala | 5 +- run | 5 +- run2.cmd | 3 ++ streaming/pom.xml | 11 ---- 9 files changed, 214 insertions(+), 125 deletions(-) create mode 100644 docs/custom-streaming-receiver.md delete mode 100644 docs/plugin-custom-receiver.md create mode 100644 docs/streaming-custom-receivers.md (limited to 'run') diff --git a/docs/custom-streaming-receiver.md b/docs/custom-streaming-receiver.md new file mode 100644 index 0000000000..0eb4246158 --- /dev/null +++ b/docs/custom-streaming-receiver.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark streaming, Plugging in a custom receiver. +--- + +A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor with Receiver { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) + } + + } + + +{% endhighlight %} + +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md deleted file mode 100644 index 0eb4246158..0000000000 --- a/docs/plugin-custom-receiver.md +++ /dev/null @@ -1,101 +0,0 @@ ---- -layout: global -title: Tutorial - Spark streaming, Plugging in a custom receiver. ---- - -A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. - -This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. - - -## A quick and naive walk-through - -### Write a simple receiver - -This starts with implementing [Actor](#References) - -Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. - -{% highlight scala %} - - class SocketTextStreamReceiver (host:String, - port:Int, - bytesToString: ByteString => String) extends Actor with Receiver { - - override def preStart = IOManager(context.system).connect(host, port) - - def receive = { - case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) - } - - } - - -{% endhighlight %} - -All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. - -### A sample spark application - -* First create a Spark streaming context with master url and batchduration. - -{% highlight scala %} - - val ssc = new StreamingContext(master, "WordCountCustomStreamSource", - Seconds(batchDuration)) - -{% endhighlight %} - -* Plug-in the actor configuration into the spark streaming context and create a DStream. - -{% highlight scala %} - - val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8445, z => z.utf8String)),"SocketReceiver") - -{% endhighlight %} - -* Process it. - -{% highlight scala %} - - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - - wordCounts.print() - ssc.start() - - -{% endhighlight %} - -* After processing it, stream can be tested using the netcat utility. - - $ nc -l localhost 8445 - hello world - hello hello - - -## Multiple homogeneous/heterogeneous receivers. - -A DStream union operation is provided for taking union on multiple input streams. - -{% highlight scala %} - - val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8445, z => z.utf8String)),"SocketReceiver") - - // Another socket stream receiver - val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8446, z => z.utf8String)),"SocketReceiver") - - val union = lines.union(lines2) - -{% endhighlight %} - -Above stream can be easily process as described earlier. - -_A more comprehensive example is provided in the spark streaming examples_ - -## References - -1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md new file mode 100644 index 0000000000..0eb4246158 --- /dev/null +++ b/docs/streaming-custom-receivers.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark streaming, Plugging in a custom receiver. +--- + +A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor with Receiver { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) + } + + } + + +{% endhighlight %} + +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ded43e67cd..0e618a06c7 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -365,14 +365,14 @@ There are two failure behaviors based on which input sources are used. Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. -## Failure of a Driver Node -A system that is required to operate 24/7 needs to be able tolerate the failure of the drive node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. To elaborate, the following state is periodically saved to a file. +## Failure of the Driver Node +A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint()` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file. 1. The DStream operator graph (input streams, output streams, etc.) 1. The configuration of each DStream (checkpoint interval, etc.) 1. The RDD checkpoint files of each DStream -All this is periodically saved in the file `/graph` where `` is the HDFS path set using `ssc.checkpoint(...)` as described earlier. To recover, a new Streaming Context can be created with this directory by using +All this is periodically saved in the file `/graph`. To recover, a new Streaming Context can be created with this directory by using {% highlight scala %} val ssc = new StreamingContext(checkpointDirectory) diff --git a/pom.xml b/pom.xml index 7e06cae052..99eb17856a 100644 --- a/pom.xml +++ b/pom.xml @@ -84,9 +84,9 @@ - typesafe-repo - Typesafe Repository - http://repo.typesafe.com/typesafe/releases/ + akka-repo + Akka Repository + http://repo.akka.io/releases/ true diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 22bdc93602..b0b6e21681 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -114,7 +114,7 @@ object SparkBuild extends Build { def coreSettings = sharedSettings ++ Seq( name := "spark-core", resolvers ++= Seq( - "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", + "Akka Repository" at "http://repo.akka.io/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", @@ -162,9 +162,6 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", - resolvers ++= Seq( - "Akka Repository" at "http://repo.akka.io/releases" - ), libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", "com.github.sgroschupf" % "zkclient" % "0.1", diff --git a/run b/run index 6b2d84d48d..ecbf7673c6 100755 --- a/run +++ b/run @@ -111,14 +111,13 @@ CLASSPATH+=":$FWDIR/conf" CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" if [ -n "$SPARK_TESTING" ] ; then CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" fi CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" -for jar in `find "$STREAMING_DIR/lib" -name '*jar'`; do - CLASSPATH+=":$jar" -done +CLASSPATH+=":$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar if [ -e "$FWDIR/lib_managed" ]; then CLASSPATH+=":$FWDIR/lib_managed/jars/*" CLASSPATH+=":$FWDIR/lib_managed/bundles/*" diff --git a/run2.cmd b/run2.cmd index c913a5195e..705a4d1ff6 100644 --- a/run2.cmd +++ b/run2.cmd @@ -47,11 +47,14 @@ set CORE_DIR=%FWDIR%core set REPL_DIR=%FWDIR%repl set EXAMPLES_DIR=%FWDIR%examples set BAGEL_DIR=%FWDIR%bagel +set STREAMING_DIR=%FWDIR%streaming set PYSPARK_DIR=%FWDIR%python rem Build up classpath set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources +set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes +set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\* set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\* set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\* diff --git a/streaming/pom.xml b/streaming/pom.xml index 92b17fc3af..15523eadcb 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -20,17 +20,6 @@ lib file://${project.basedir}/lib - - akka-repo - Akka Repository - http://repo.akka.io/releases - - true - - - false - - -- cgit v1.2.3 From 25f737804ac1f4a925aaf6b1a147e63bc08a75f4 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 25 Feb 2013 11:53:55 -0800 Subject: Change tabs to spaces --- run | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) (limited to 'run') diff --git a/run b/run index ecbf7673c6..fd06fbe7c7 100755 --- a/run +++ b/run @@ -28,21 +28,21 @@ fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in - 'spark.deploy.master.Master') - SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" - ;; - 'spark.deploy.worker.Worker') - SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" - ;; - 'spark.executor.StandaloneExecutorBackend') - SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" - ;; - 'spark.executor.MesosExecutorBackend') - SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" - ;; - 'spark.repl.Main') - SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" - ;; + 'spark.deploy.master.Master') + SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" + ;; + 'spark.deploy.worker.Worker') + SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" + ;; + 'spark.executor.StandaloneExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.executor.MesosExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.repl.Main') + SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" + ;; esac if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then -- cgit v1.2.3 From 5d7b591cfe14177f083814fe3e81745c5d279810 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 25 Feb 2013 19:34:32 -0800 Subject: Pass a code JAR to SparkContext in our examples. Fixes SPARK-594. --- .../src/main/java/spark/examples/JavaHdfsLR.java | 6 ++- examples/src/main/java/spark/examples/JavaTC.java | 5 +- .../main/java/spark/examples/JavaWordCount.java | 5 +- .../streaming/examples/JavaFlumeEventCount.java | 3 +- .../streaming/examples/JavaNetworkWordCount.java | 4 +- .../spark/streaming/examples/JavaQueueStream.java | 3 +- .../main/scala/spark/examples/BroadcastTest.scala | 10 ++-- .../spark/examples/ExceptionHandlingTest.scala | 5 +- .../main/scala/spark/examples/GroupByTest.scala | 5 +- .../src/main/scala/spark/examples/HdfsTest.scala | 3 +- .../src/main/scala/spark/examples/LocalALS.scala | 4 +- .../main/scala/spark/examples/LocalKMeans.scala | 3 ++ .../src/main/scala/spark/examples/LocalLR.scala | 3 ++ .../src/main/scala/spark/examples/LogQuery.scala | 4 +- .../scala/spark/examples/MultiBroadcastTest.scala | 16 +++--- .../spark/examples/SimpleSkewedGroupByTest.scala | 3 +- .../scala/spark/examples/SkewedGroupByTest.scala | 11 ++-- .../src/main/scala/spark/examples/SparkALS.scala | 62 +++++++++++----------- .../main/scala/spark/examples/SparkHdfsLR.scala | 6 ++- .../main/scala/spark/examples/SparkKMeans.scala | 6 ++- .../src/main/scala/spark/examples/SparkLR.scala | 6 ++- .../src/main/scala/spark/examples/SparkPi.scala | 3 +- .../src/main/scala/spark/examples/SparkTC.scala | 4 +- .../spark/streaming/examples/ActorWordCount.scala | 3 +- .../spark/streaming/examples/FlumeEventCount.scala | 3 +- .../spark/streaming/examples/HdfsWordCount.scala | 3 +- .../spark/streaming/examples/KafkaWordCount.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../spark/streaming/examples/QueueStream.scala | 3 +- .../spark/streaming/examples/RawNetworkGrep.scala | 3 +- .../streaming/examples/TwitterAlgebirdCMS.scala | 3 +- .../streaming/examples/TwitterAlgebirdHLL.scala | 3 +- .../streaming/examples/TwitterPopularTags.scala | 3 +- .../spark/streaming/examples/ZeroMQWordCount.scala | 5 +- .../examples/clickstream/PageViewStream.scala | 3 +- run | 10 ++++ run2.cmd | 10 ++++ .../streaming/api/java/JavaStreamingContext.scala | 17 ++++++ 38 files changed, 174 insertions(+), 82 deletions(-) (limited to 'run') diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java index 29839d5668..8b0a9b6808 100644 --- a/examples/src/main/java/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java @@ -10,6 +10,9 @@ import java.util.Arrays; import java.util.StringTokenizer; import java.util.Random; +/** + * Logistic regression based classification. + */ public class JavaHdfsLR { static int D = 10; // Number of dimensions @@ -85,7 +88,8 @@ public class JavaHdfsLR { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD lines = sc.textFile(args[1]); JavaRDD points = lines.map(new ParsePoint()).cache(); int ITERATIONS = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index e3bd881b8f..b319bdab44 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -28,7 +28,7 @@ public class JavaTC { Tuple2 e = new Tuple2(from, to); if (from != to) edges.add(e); } - return new ArrayList(edges); + return new ArrayList>(edges); } static class ProjectFn extends PairFunction>, @@ -46,7 +46,8 @@ public class JavaTC { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; JavaPairRDD tc = sc.parallelizePairs(generateGraph(), slices).cache(); diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java index a44cf8a120..9d4c7a252d 100644 --- a/examples/src/main/java/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -18,7 +18,8 @@ public class JavaWordCount { System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount"); + JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD lines = ctx.textFile(args[1], 1); JavaRDD words = lines.flatMap(new FlatMapFunction() { @@ -29,7 +30,7 @@ public class JavaWordCount { JavaPairRDD ones = words.map(new PairFunction() { public Tuple2 call(String s) { - return new Tuple2(s, 1); + return new Tuple2(s, 1); } }); diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java index cddce16e39..e24c6ddaa7 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java @@ -32,7 +32,8 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaDStream flumeStream = sc.flumeStream("localhost", port); diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 0e9eadd01b..3e57580fd4 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java @@ -30,8 +30,8 @@ public class JavaNetworkWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext( - args[0], "NetworkWordCount", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java index 43c3cd4dfa..15b82c8da1 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java @@ -22,7 +22,8 @@ public class JavaQueueStream { } // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala index 230097c7db..ba59be1687 100644 --- a/examples/src/main/scala/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -9,19 +9,21 @@ object BroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } for (i <- 0 until 2) { println("Iteration " + i) println("===========") - val barr1 = spark.broadcast(arr1) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } } diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala index c89f3dac0c..21a90f2e5a 100644 --- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala @@ -9,9 +9,10 @@ object ExceptionHandlingTest { System.exit(1) } - val sc = new SparkContext(args(0), "ExceptionHandlingTest") + val sc = new SparkContext(args(0), "ExceptionHandlingTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) sc.parallelize(0 until sc.defaultParallelism).foreach { i => - if (Math.random > 0.75) + if (math.random > 0.75) throw new Exception("Testing exception handling") } diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala index 86dfba3a40..a6603653f1 100644 --- a/examples/src/main/scala/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/spark/examples/GroupByTest.scala @@ -9,14 +9,15 @@ object GroupByTest { if (args.length == 0) { System.err.println("Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) - } + } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala index 7a4530609d..dd61c467f7 100644 --- a/examples/src/main/scala/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/spark/examples/HdfsTest.scala @@ -4,7 +4,8 @@ import spark._ object HdfsTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HdfsTest") + val sc = new SparkContext(args(0), "HdfsTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val file = sc.textFile(args(1)) val mapped = file.map(s => s.length).cache() for (iter <- 1 to 10) { diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala index 10e03359c9..2de810e062 100644 --- a/examples/src/main/scala/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -1,11 +1,13 @@ package spark.examples -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ +/** + * Alternating least squares matrix factorization. + */ object LocalALS { // Parameters set through command line arguments var M = 0 // Number of movies diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index b442c604cd..b07e799cef 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -6,6 +6,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object LocalKMeans { val N = 1000 val R = 1000 // Scaling factor diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index 9553162004..cd73f553d6 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -3,6 +3,9 @@ package spark.examples import java.util.Random import spark.util.Vector +/** + * Logistic regression based classification. + */ object LocalLR { val N = 10000 // Number of data points val D = 10 // Number of dimensions diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala index 5330b8da94..6497596d35 100644 --- a/examples/src/main/scala/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/spark/examples/LogQuery.scala @@ -26,7 +26,9 @@ object LogQuery { System.err.println("Usage: LogQuery [logFile]") System.exit(1) } - val sc = new SparkContext(args(0), "Log Query") + + val sc = new SparkContext(args(0), "Log Query", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val dataSet = if (args.length == 2) sc.textFile(args(1)) diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala index 83ae014e94..92cd81c487 100644 --- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -9,21 +9,25 @@ object MultiBroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } var arr2 = new Array[Int](num) - for (i <- 0 until arr2.length) + for (i <- 0 until arr2.length) { arr2(i) = i + } - val barr1 = spark.broadcast(arr1) - val barr2 = spark.broadcast(arr2) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + val barr2 = sc.broadcast(arr2) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size + barr2.value.size) } diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala index 50b3a263b4..0d17bda004 100644 --- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -18,7 +18,8 @@ object SimpleSkewedGroupByTest { var numReducers = if (args.length > 4) args(4).toInt else numMappers var ratio = if (args.length > 5) args(5).toInt else 5.0 - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala index d2117a263e..83be3fc27b 100644 --- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -16,13 +16,14 @@ object SkewedGroupByTest { var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random // map output sizes lineraly increase from the 1st to the last - numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt + numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { @@ -31,11 +32,11 @@ object SkewedGroupByTest { arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 - }.cache + }.cache() // Enforce that everything has been calculated and in cache - pairs1.count + pairs1.count() - println(pairs1.groupByKey(numReducers).count) + println(pairs1.groupByKey(numReducers).count()) System.exit(0) } diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala index 5e01885dbb..8fb3b0fb2a 100644 --- a/examples/src/main/scala/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -1,14 +1,14 @@ package spark.examples -import java.io.Serializable -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ import spark._ -import scala.Option +/** + * Alternating least squares matrix factorization. + */ object SparkALS { // Parameters set through command line arguments var M = 0 // Number of movies @@ -70,30 +70,32 @@ object SparkALS { } def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkALS [ ]") + System.exit(1) + } + var host = "" var slices = 0 - (0 to 5).map(i => { - i match { - case a if a < args.length => Some(args(a)) - case _ => None - } - }).toArray match { - case Array(host_, m, u, f, iters, slices_) => { - host = host_ getOrElse "local" - M = (m getOrElse "100").toInt - U = (u getOrElse "500").toInt - F = (f getOrElse "10").toInt - ITERATIONS = (iters getOrElse "5").toInt - slices = (slices_ getOrElse "2").toInt - } - case _ => { - System.err.println("Usage: SparkALS [ ]") + val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None) + + options.toArray match { + case Array(host_, m, u, f, iters, slices_) => + host = host_.get + M = m.getOrElse("100").toInt + U = u.getOrElse("500").toInt + F = f.getOrElse("10").toInt + ITERATIONS = iters.getOrElse("5").toInt + slices = slices_.getOrElse("2").toInt + case _ => + System.err.println("Usage: SparkALS [ ]") System.exit(1) - } } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) - val spark = new SparkContext(host, "SparkALS") + + val sc = new SparkContext(host, "SparkALS", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val R = generateR() @@ -102,19 +104,19 @@ object SparkALS { var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users - val Rc = spark.broadcast(R) - var msc = spark.broadcast(ms) - var usc = spark.broadcast(us) + val Rc = sc.broadcast(R) + var msb = sc.broadcast(ms) + var usb = sc.broadcast(us) for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") - ms = spark.parallelize(0 until M, slices) - .map(i => update(i, msc.value(i), usc.value, Rc.value)) + ms = sc.parallelize(0 until M, slices) + .map(i => update(i, msb.value(i), usb.value, Rc.value)) .toArray - msc = spark.broadcast(ms) // Re-broadcast ms because it was updated - us = spark.parallelize(0 until U, slices) - .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value))) + msb = sc.broadcast(ms) // Re-broadcast ms because it was updated + us = sc.parallelize(0 until U, slices) + .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) .toArray - usc = spark.broadcast(us) // Re-broadcast us because it was updated + usb = sc.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) println() } diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 5b2bc84d69..0f42f405a0 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkHdfsLR { val D = 10 // Numer of dimensions val rand = new Random(42) @@ -29,7 +32,8 @@ object SparkHdfsLR { System.err.println("Usage: SparkHdfsLR ") System.exit(1) } - val sc = new SparkContext(args(0), "SparkHdfsLR") + val sc = new SparkContext(args(0), "SparkHdfsLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index 6375961390..7c21ea12fb 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -7,6 +7,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object SparkKMeans { val R = 1000 // Scaling factor val rand = new Random(42) @@ -36,7 +39,8 @@ object SparkKMeans { System.err.println("Usage: SparkLocalKMeans ") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLocalKMeans") + val sc = new SparkContext(args(0), "SparkLocalKMeans", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val data = lines.map(parseVector _).cache() val K = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index aaaf062c8f..2f41aeb376 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkLR { val N = 10000 // Number of data points val D = 10 // Numer of dimensions @@ -28,7 +31,8 @@ object SparkLR { System.err.println("Usage: SparkLR []") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLR") + val sc = new SparkContext(args(0), "SparkLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val numSlices = if (args.length > 1) args(1).toInt else 2 val points = sc.parallelize(generateData, numSlices).cache() diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala index 2f226f1380..5a31d74444 100644 --- a/examples/src/main/scala/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/spark/examples/SparkPi.scala @@ -10,7 +10,8 @@ object SparkPi { System.err.println("Usage: SparkPi []") System.exit(1) } - val spark = new SparkContext(args(0), "SparkPi") + val spark = new SparkContext(args(0), "SparkPi", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index 90bae011ad..911ae8f168 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -9,7 +9,6 @@ import scala.collection.mutable * Transitive closure on a graph. */ object SparkTC { - val numEdges = 200 val numVertices = 100 val rand = new Random(42) @@ -29,7 +28,8 @@ object SparkTC { System.err.println("Usage: SparkTC []") System.exit(1) } - val spark = new SparkContext(args(0), "SparkTC") + val spark = new SparkContext(args(0), "SparkTC", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 var tc = spark.parallelize(generateGraph, slices).cache() diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala index 76293fbb96..3b847fe603 100644 --- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -131,7 +131,8 @@ object ActorWordCount { val Seq(master, host, port) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) /* * Following is the use of actorStream to plug in custom actor as receiver diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala index 461929fba2..39c76fd98a 100644 --- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -30,7 +30,8 @@ object FlumeEventCount { val batchInterval = Milliseconds(2000) // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval) + val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a flume stream val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala index 8530f5c175..9389f8a38d 100644 --- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala @@ -22,7 +22,8 @@ object HdfsWordCount { } // Create the context - val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2)) + val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index 9b135a5c54..c3a9e491ba 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -32,8 +32,8 @@ object KafkaWordCount { val Array(master, zkQuorum, group, topics, numThreads) = args - val sc = new SparkContext(master, "KafkaWordCount") - val ssc = new StreamingContext(sc, Seconds(2)) + val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 5ac6d19b34..704540c2bf 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -23,7 +23,8 @@ object NetworkWordCount { } // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala index e9cb7b55ea..f450e21040 100644 --- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -15,7 +15,8 @@ object QueueStream { } // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala index 49b3223eec..175281e095 100644 --- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -31,7 +31,8 @@ object RawNetworkGrep { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis)) + val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Warm up the JVMs on master and slave for JIT compilation to kick in RawTextHelper.warmUp(ssc.sparkContext) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 39a1a702ee..483aae452b 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -43,7 +43,8 @@ object TwitterAlgebirdCMS { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index 914fba4ca2..f3288bfb85 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -32,7 +32,8 @@ object TwitterAlgebirdHLL { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index fdb3a4c73c..9d4494c6f2 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -21,7 +21,8 @@ object TwitterPopularTags { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala index 5ed9b7cb76..74d0d338b7 100644 --- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -58,7 +58,8 @@ object ZeroMQWordCount { val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator @@ -70,4 +71,4 @@ object ZeroMQWordCount { ssc.start() } -} \ No newline at end of file +} diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 9a2ba30ee4..e226a4a73a 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -24,7 +24,8 @@ object PageViewStream { val port = args(2).toInt // Create the context - val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.socketTextStream(host, port) diff --git a/run b/run index fd06fbe7c7..2c780623c8 100755 --- a/run +++ b/run @@ -134,6 +134,16 @@ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do done export CLASSPATH # Needed for spark-shell +# Figure out the JAR file that our examples were packaged into. +if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar" ]; then + # Use the JAR from the SBT build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar"` +fi +if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar" ]; then + # Use the JAR from the Maven build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar"` +fi + # Figure out whether to run our class with java or with the scala launcher. # In most cases, we'd prefer to execute our process with java because scala # creates a shell script as the parent of its Java process, which makes it diff --git a/run2.cmd b/run2.cmd index 705a4d1ff6..f34869f1b1 100644 --- a/run2.cmd +++ b/run2.cmd @@ -62,6 +62,16 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\* set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes +rem Figure out the JAR file that our examples were packaged into. +rem First search in the build path from SBT: +for /D %%d in ("%EXAMPLES_DIR%/target/scala-%SCALA_VERSION%/spark-examples*.jar") do ( + set SPARK_EXAMPLES_JAR=%%d +) +rem Then search in the build path from Maven: +for /D %%d in ("%EXAMPLES_DIR%/target/spark-examples*hadoop*.jar") do ( + set SPARK_EXAMPLES_JAR=%%d +) + rem Figure out whether to run our class with java or with the scala launcher. rem In most cases, we'd prefer to execute our process with java because scala rem creates a shell script as the parent of its Java process, which makes it diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 755407aecc..3d149a742c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -43,6 +43,23 @@ class JavaStreamingContext(val ssc: StreamingContext) { def this(master: String, appName: String, batchDuration: Duration) = this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local + * file system or an HDFS, HTTP, HTTPS, or FTP URL. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jarFile: String) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map())) + /** * Creates a StreamingContext. * @param master Name of the Spark Master -- cgit v1.2.3 From 434a1ce7739b4f1abe93408edaff9388e480d806 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 26 Feb 2013 12:24:18 -0800 Subject: Small hack to work around multiple JARs being built by sbt package --- run | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'run') diff --git a/run b/run index 2c780623c8..2c29cc4a66 100755 --- a/run +++ b/run @@ -134,14 +134,15 @@ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do done export CLASSPATH # Needed for spark-shell -# Figure out the JAR file that our examples were packaged into. -if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar" ]; then +# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack +# to avoid the -sources and -doc packages that are built by publish-local. +if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then # Use the JAR from the SBT build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar"` + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar` fi -if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar" ]; then +if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar ]; then # Use the JAR from the Maven build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar"` + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar` fi # Figure out whether to run our class with java or with the scala launcher. -- cgit v1.2.3