From 490f056cddc3dc02066a1e2414be6576d6441d51 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 25 Feb 2013 15:13:30 -0800 Subject: Allow passing sparkHome and JARs to StreamingContext constructor Also warns if spark.cleaner.ttl is not set in the version where you pass your own SparkContext. --- .../main/scala/spark/streaming/Checkpoint.scala | 1 + .../scala/spark/streaming/StreamingContext.scala | 40 ++++++++++++++++------ .../streaming/api/java/JavaStreamingContext.scala | 36 +++++++++++++++++++ .../streaming/dstream/KafkaInputDStream.scala | 1 + 4 files changed, 68 insertions(+), 10 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e7a392fbbf..e303e33e5e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val framework = ssc.sc.appName val sparkHome = ssc.sc.sparkHome val jars = ssc.sc.jars + val environment = ssc.sc.environment val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 25c67b279b..31b5d2c8bc 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -6,7 +6,7 @@ import akka.zeromq.Subscribe import spark.streaming.dstream._ -import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark._ import spark.streaming.receivers.ActorReceiver import spark.streaming.receivers.ReceiverSupervisorStrategy import spark.streaming.receivers.ZeroMQReceiver @@ -14,18 +14,18 @@ import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver - import scala.collection.mutable.Queue +import scala.collection.Map import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger +import java.util.UUID import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -import java.util.UUID import twitter4j.Status /** @@ -44,7 +44,9 @@ class StreamingContext private ( * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) + def this(sparkContext: SparkContext, batchDuration: Duration) = { + this(sparkContext, null, batchDuration) + } /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. @@ -52,8 +54,17 @@ class StreamingContext private ( * @param appName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(master: String, appName: String, batchDuration: Duration) = - this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration) + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String = null, + jars: Seq[String] = Nil, + environment: Map[String, String] = Map()) = { + this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), + null, batchDuration) + } + /** * Re-create a StreamingContext from a checkpoint file. @@ -65,15 +76,20 @@ class StreamingContext private ( initLogging() if (sc_ == null && cp_ == null) { - throw new Exception("Streaming Context cannot be initilalized with " + + throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") } + if (MetadataCleaner.getDelaySeconds < 0) { + throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") + } + protected[streaming] val isCheckpointPresent = (cp_ != null) protected[streaming] val sc: SparkContext = { if (isCheckpointPresent) { - new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars) + new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment) } else { sc_ } @@ -478,8 +494,12 @@ object StreamingContext { new PairDStreamFunctions[K, V](stream) } - protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = { - + protected[streaming] def createNewSparkContext( + master: String, + appName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String]): SparkContext = { // Set the default cleaner delay to an hour if not already set. // This should be sufficient for even 1 second interval. if (MetadataCleaner.getDelaySeconds < 0) { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index f3b40b5b88..b528ebbc19 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -45,6 +45,42 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars)) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String], + environment: JMap[String, String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment)) + + /** + * Creates a StreamingContext using an existing SparkContext. * @param sparkContext The underlying JavaSparkContext to use * @param batchDuration The time interval at which streaming data will be divided into batches */ diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index dc7139cc27..ddd9becf32 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -13,6 +13,7 @@ import kafka.serializer.StringDecoder import kafka.utils.{Utils, ZKGroupTopicDirs} import kafka.utils.ZkUtils._ +import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ -- cgit v1.2.3 From 4d480ec59e8cf268054ed805abcd1e84eca17b41 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 25 Feb 2013 15:53:43 -0800 Subject: Fixed something that was reported as a compile error in ScalaDoc. For some reason, ScalaDoc complained about no such constructor for StreamingContext; it doesn't seem like an actual Scala error but it prevented sbt publish and from working because docs weren't built. --- .../main/scala/spark/streaming/api/java/JavaStreamingContext.scala | 4 ++-- streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index b528ebbc19..755407aecc 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -41,7 +41,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(master: String, appName: String, batchDuration: Duration) = - this(new StreamingContext(master, appName, batchDuration)) + this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) /** * Creates a StreamingContext. @@ -58,7 +58,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { batchDuration: Duration, sparkHome: String, jars: Array[String]) = - this(new StreamingContext(master, appName, batchDuration, sparkHome, jars)) + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map())) /** * Creates a StreamingContext. diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index bdd9f4d753..f673e5be15 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -159,7 +159,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration) + var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) -- cgit v1.2.3 From bc4a6eb850c84793aa65c0dd2e27f379876b1969 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 25 Feb 2013 18:04:21 -0800 Subject: Changed Flume test to use the same port as other tests, so that can be controlled centrally. --- streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 1024d3ac97..ebcb6d0092 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -94,7 +94,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -104,7 +104,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver); -- cgit v1.2.3 From 5d7b591cfe14177f083814fe3e81745c5d279810 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 25 Feb 2013 19:34:32 -0800 Subject: Pass a code JAR to SparkContext in our examples. Fixes SPARK-594. --- .../src/main/java/spark/examples/JavaHdfsLR.java | 6 ++- examples/src/main/java/spark/examples/JavaTC.java | 5 +- .../main/java/spark/examples/JavaWordCount.java | 5 +- .../streaming/examples/JavaFlumeEventCount.java | 3 +- .../streaming/examples/JavaNetworkWordCount.java | 4 +- .../spark/streaming/examples/JavaQueueStream.java | 3 +- .../main/scala/spark/examples/BroadcastTest.scala | 10 ++-- .../spark/examples/ExceptionHandlingTest.scala | 5 +- .../main/scala/spark/examples/GroupByTest.scala | 5 +- .../src/main/scala/spark/examples/HdfsTest.scala | 3 +- .../src/main/scala/spark/examples/LocalALS.scala | 4 +- .../main/scala/spark/examples/LocalKMeans.scala | 3 ++ .../src/main/scala/spark/examples/LocalLR.scala | 3 ++ .../src/main/scala/spark/examples/LogQuery.scala | 4 +- .../scala/spark/examples/MultiBroadcastTest.scala | 16 +++--- .../spark/examples/SimpleSkewedGroupByTest.scala | 3 +- .../scala/spark/examples/SkewedGroupByTest.scala | 11 ++-- .../src/main/scala/spark/examples/SparkALS.scala | 62 +++++++++++----------- .../main/scala/spark/examples/SparkHdfsLR.scala | 6 ++- .../main/scala/spark/examples/SparkKMeans.scala | 6 ++- .../src/main/scala/spark/examples/SparkLR.scala | 6 ++- .../src/main/scala/spark/examples/SparkPi.scala | 3 +- .../src/main/scala/spark/examples/SparkTC.scala | 4 +- .../spark/streaming/examples/ActorWordCount.scala | 3 +- .../spark/streaming/examples/FlumeEventCount.scala | 3 +- .../spark/streaming/examples/HdfsWordCount.scala | 3 +- .../spark/streaming/examples/KafkaWordCount.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../spark/streaming/examples/QueueStream.scala | 3 +- .../spark/streaming/examples/RawNetworkGrep.scala | 3 +- .../streaming/examples/TwitterAlgebirdCMS.scala | 3 +- .../streaming/examples/TwitterAlgebirdHLL.scala | 3 +- .../streaming/examples/TwitterPopularTags.scala | 3 +- .../spark/streaming/examples/ZeroMQWordCount.scala | 5 +- .../examples/clickstream/PageViewStream.scala | 3 +- run | 10 ++++ run2.cmd | 10 ++++ .../streaming/api/java/JavaStreamingContext.scala | 17 ++++++ 38 files changed, 174 insertions(+), 82 deletions(-) (limited to 'streaming') diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java index 29839d5668..8b0a9b6808 100644 --- a/examples/src/main/java/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java @@ -10,6 +10,9 @@ import java.util.Arrays; import java.util.StringTokenizer; import java.util.Random; +/** + * Logistic regression based classification. + */ public class JavaHdfsLR { static int D = 10; // Number of dimensions @@ -85,7 +88,8 @@ public class JavaHdfsLR { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD lines = sc.textFile(args[1]); JavaRDD points = lines.map(new ParsePoint()).cache(); int ITERATIONS = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index e3bd881b8f..b319bdab44 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -28,7 +28,7 @@ public class JavaTC { Tuple2 e = new Tuple2(from, to); if (from != to) edges.add(e); } - return new ArrayList(edges); + return new ArrayList>(edges); } static class ProjectFn extends PairFunction>, @@ -46,7 +46,8 @@ public class JavaTC { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; JavaPairRDD tc = sc.parallelizePairs(generateGraph(), slices).cache(); diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java index a44cf8a120..9d4c7a252d 100644 --- a/examples/src/main/java/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -18,7 +18,8 @@ public class JavaWordCount { System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount"); + JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD lines = ctx.textFile(args[1], 1); JavaRDD words = lines.flatMap(new FlatMapFunction() { @@ -29,7 +30,7 @@ public class JavaWordCount { JavaPairRDD ones = words.map(new PairFunction() { public Tuple2 call(String s) { - return new Tuple2(s, 1); + return new Tuple2(s, 1); } }); diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java index cddce16e39..e24c6ddaa7 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java @@ -32,7 +32,8 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaDStream flumeStream = sc.flumeStream("localhost", port); diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 0e9eadd01b..3e57580fd4 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java @@ -30,8 +30,8 @@ public class JavaNetworkWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext( - args[0], "NetworkWordCount", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java index 43c3cd4dfa..15b82c8da1 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java @@ -22,7 +22,8 @@ public class JavaQueueStream { } // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala index 230097c7db..ba59be1687 100644 --- a/examples/src/main/scala/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -9,19 +9,21 @@ object BroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } for (i <- 0 until 2) { println("Iteration " + i) println("===========") - val barr1 = spark.broadcast(arr1) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } } diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala index c89f3dac0c..21a90f2e5a 100644 --- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala @@ -9,9 +9,10 @@ object ExceptionHandlingTest { System.exit(1) } - val sc = new SparkContext(args(0), "ExceptionHandlingTest") + val sc = new SparkContext(args(0), "ExceptionHandlingTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) sc.parallelize(0 until sc.defaultParallelism).foreach { i => - if (Math.random > 0.75) + if (math.random > 0.75) throw new Exception("Testing exception handling") } diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala index 86dfba3a40..a6603653f1 100644 --- a/examples/src/main/scala/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/spark/examples/GroupByTest.scala @@ -9,14 +9,15 @@ object GroupByTest { if (args.length == 0) { System.err.println("Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) - } + } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala index 7a4530609d..dd61c467f7 100644 --- a/examples/src/main/scala/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/spark/examples/HdfsTest.scala @@ -4,7 +4,8 @@ import spark._ object HdfsTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HdfsTest") + val sc = new SparkContext(args(0), "HdfsTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val file = sc.textFile(args(1)) val mapped = file.map(s => s.length).cache() for (iter <- 1 to 10) { diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala index 10e03359c9..2de810e062 100644 --- a/examples/src/main/scala/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -1,11 +1,13 @@ package spark.examples -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ +/** + * Alternating least squares matrix factorization. + */ object LocalALS { // Parameters set through command line arguments var M = 0 // Number of movies diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index b442c604cd..b07e799cef 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -6,6 +6,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object LocalKMeans { val N = 1000 val R = 1000 // Scaling factor diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index 9553162004..cd73f553d6 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -3,6 +3,9 @@ package spark.examples import java.util.Random import spark.util.Vector +/** + * Logistic regression based classification. + */ object LocalLR { val N = 10000 // Number of data points val D = 10 // Number of dimensions diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala index 5330b8da94..6497596d35 100644 --- a/examples/src/main/scala/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/spark/examples/LogQuery.scala @@ -26,7 +26,9 @@ object LogQuery { System.err.println("Usage: LogQuery [logFile]") System.exit(1) } - val sc = new SparkContext(args(0), "Log Query") + + val sc = new SparkContext(args(0), "Log Query", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val dataSet = if (args.length == 2) sc.textFile(args(1)) diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala index 83ae014e94..92cd81c487 100644 --- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -9,21 +9,25 @@ object MultiBroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } var arr2 = new Array[Int](num) - for (i <- 0 until arr2.length) + for (i <- 0 until arr2.length) { arr2(i) = i + } - val barr1 = spark.broadcast(arr1) - val barr2 = spark.broadcast(arr2) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + val barr2 = sc.broadcast(arr2) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size + barr2.value.size) } diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala index 50b3a263b4..0d17bda004 100644 --- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -18,7 +18,8 @@ object SimpleSkewedGroupByTest { var numReducers = if (args.length > 4) args(4).toInt else numMappers var ratio = if (args.length > 5) args(5).toInt else 5.0 - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala index d2117a263e..83be3fc27b 100644 --- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -16,13 +16,14 @@ object SkewedGroupByTest { var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random // map output sizes lineraly increase from the 1st to the last - numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt + numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { @@ -31,11 +32,11 @@ object SkewedGroupByTest { arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 - }.cache + }.cache() // Enforce that everything has been calculated and in cache - pairs1.count + pairs1.count() - println(pairs1.groupByKey(numReducers).count) + println(pairs1.groupByKey(numReducers).count()) System.exit(0) } diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala index 5e01885dbb..8fb3b0fb2a 100644 --- a/examples/src/main/scala/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -1,14 +1,14 @@ package spark.examples -import java.io.Serializable -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ import spark._ -import scala.Option +/** + * Alternating least squares matrix factorization. + */ object SparkALS { // Parameters set through command line arguments var M = 0 // Number of movies @@ -70,30 +70,32 @@ object SparkALS { } def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkALS [ ]") + System.exit(1) + } + var host = "" var slices = 0 - (0 to 5).map(i => { - i match { - case a if a < args.length => Some(args(a)) - case _ => None - } - }).toArray match { - case Array(host_, m, u, f, iters, slices_) => { - host = host_ getOrElse "local" - M = (m getOrElse "100").toInt - U = (u getOrElse "500").toInt - F = (f getOrElse "10").toInt - ITERATIONS = (iters getOrElse "5").toInt - slices = (slices_ getOrElse "2").toInt - } - case _ => { - System.err.println("Usage: SparkALS [ ]") + val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None) + + options.toArray match { + case Array(host_, m, u, f, iters, slices_) => + host = host_.get + M = m.getOrElse("100").toInt + U = u.getOrElse("500").toInt + F = f.getOrElse("10").toInt + ITERATIONS = iters.getOrElse("5").toInt + slices = slices_.getOrElse("2").toInt + case _ => + System.err.println("Usage: SparkALS [ ]") System.exit(1) - } } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) - val spark = new SparkContext(host, "SparkALS") + + val sc = new SparkContext(host, "SparkALS", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val R = generateR() @@ -102,19 +104,19 @@ object SparkALS { var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users - val Rc = spark.broadcast(R) - var msc = spark.broadcast(ms) - var usc = spark.broadcast(us) + val Rc = sc.broadcast(R) + var msb = sc.broadcast(ms) + var usb = sc.broadcast(us) for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") - ms = spark.parallelize(0 until M, slices) - .map(i => update(i, msc.value(i), usc.value, Rc.value)) + ms = sc.parallelize(0 until M, slices) + .map(i => update(i, msb.value(i), usb.value, Rc.value)) .toArray - msc = spark.broadcast(ms) // Re-broadcast ms because it was updated - us = spark.parallelize(0 until U, slices) - .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value))) + msb = sc.broadcast(ms) // Re-broadcast ms because it was updated + us = sc.parallelize(0 until U, slices) + .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) .toArray - usc = spark.broadcast(us) // Re-broadcast us because it was updated + usb = sc.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) println() } diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 5b2bc84d69..0f42f405a0 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkHdfsLR { val D = 10 // Numer of dimensions val rand = new Random(42) @@ -29,7 +32,8 @@ object SparkHdfsLR { System.err.println("Usage: SparkHdfsLR ") System.exit(1) } - val sc = new SparkContext(args(0), "SparkHdfsLR") + val sc = new SparkContext(args(0), "SparkHdfsLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index 6375961390..7c21ea12fb 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -7,6 +7,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object SparkKMeans { val R = 1000 // Scaling factor val rand = new Random(42) @@ -36,7 +39,8 @@ object SparkKMeans { System.err.println("Usage: SparkLocalKMeans ") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLocalKMeans") + val sc = new SparkContext(args(0), "SparkLocalKMeans", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val data = lines.map(parseVector _).cache() val K = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index aaaf062c8f..2f41aeb376 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkLR { val N = 10000 // Number of data points val D = 10 // Numer of dimensions @@ -28,7 +31,8 @@ object SparkLR { System.err.println("Usage: SparkLR []") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLR") + val sc = new SparkContext(args(0), "SparkLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val numSlices = if (args.length > 1) args(1).toInt else 2 val points = sc.parallelize(generateData, numSlices).cache() diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala index 2f226f1380..5a31d74444 100644 --- a/examples/src/main/scala/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/spark/examples/SparkPi.scala @@ -10,7 +10,8 @@ object SparkPi { System.err.println("Usage: SparkPi []") System.exit(1) } - val spark = new SparkContext(args(0), "SparkPi") + val spark = new SparkContext(args(0), "SparkPi", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index 90bae011ad..911ae8f168 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -9,7 +9,6 @@ import scala.collection.mutable * Transitive closure on a graph. */ object SparkTC { - val numEdges = 200 val numVertices = 100 val rand = new Random(42) @@ -29,7 +28,8 @@ object SparkTC { System.err.println("Usage: SparkTC []") System.exit(1) } - val spark = new SparkContext(args(0), "SparkTC") + val spark = new SparkContext(args(0), "SparkTC", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 var tc = spark.parallelize(generateGraph, slices).cache() diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala index 76293fbb96..3b847fe603 100644 --- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -131,7 +131,8 @@ object ActorWordCount { val Seq(master, host, port) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) /* * Following is the use of actorStream to plug in custom actor as receiver diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala index 461929fba2..39c76fd98a 100644 --- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -30,7 +30,8 @@ object FlumeEventCount { val batchInterval = Milliseconds(2000) // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval) + val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a flume stream val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala index 8530f5c175..9389f8a38d 100644 --- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala @@ -22,7 +22,8 @@ object HdfsWordCount { } // Create the context - val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2)) + val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index 9b135a5c54..c3a9e491ba 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -32,8 +32,8 @@ object KafkaWordCount { val Array(master, zkQuorum, group, topics, numThreads) = args - val sc = new SparkContext(master, "KafkaWordCount") - val ssc = new StreamingContext(sc, Seconds(2)) + val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 5ac6d19b34..704540c2bf 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -23,7 +23,8 @@ object NetworkWordCount { } // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala index e9cb7b55ea..f450e21040 100644 --- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -15,7 +15,8 @@ object QueueStream { } // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala index 49b3223eec..175281e095 100644 --- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -31,7 +31,8 @@ object RawNetworkGrep { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis)) + val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Warm up the JVMs on master and slave for JIT compilation to kick in RawTextHelper.warmUp(ssc.sparkContext) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 39a1a702ee..483aae452b 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -43,7 +43,8 @@ object TwitterAlgebirdCMS { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index 914fba4ca2..f3288bfb85 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -32,7 +32,8 @@ object TwitterAlgebirdHLL { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index fdb3a4c73c..9d4494c6f2 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -21,7 +21,8 @@ object TwitterPopularTags { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala index 5ed9b7cb76..74d0d338b7 100644 --- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -58,7 +58,8 @@ object ZeroMQWordCount { val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator @@ -70,4 +71,4 @@ object ZeroMQWordCount { ssc.start() } -} \ No newline at end of file +} diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 9a2ba30ee4..e226a4a73a 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -24,7 +24,8 @@ object PageViewStream { val port = args(2).toInt // Create the context - val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.socketTextStream(host, port) diff --git a/run b/run index fd06fbe7c7..2c780623c8 100755 --- a/run +++ b/run @@ -134,6 +134,16 @@ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do done export CLASSPATH # Needed for spark-shell +# Figure out the JAR file that our examples were packaged into. +if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar" ]; then + # Use the JAR from the SBT build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar"` +fi +if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar" ]; then + # Use the JAR from the Maven build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar"` +fi + # Figure out whether to run our class with java or with the scala launcher. # In most cases, we'd prefer to execute our process with java because scala # creates a shell script as the parent of its Java process, which makes it diff --git a/run2.cmd b/run2.cmd index 705a4d1ff6..f34869f1b1 100644 --- a/run2.cmd +++ b/run2.cmd @@ -62,6 +62,16 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\* set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes +rem Figure out the JAR file that our examples were packaged into. +rem First search in the build path from SBT: +for /D %%d in ("%EXAMPLES_DIR%/target/scala-%SCALA_VERSION%/spark-examples*.jar") do ( + set SPARK_EXAMPLES_JAR=%%d +) +rem Then search in the build path from Maven: +for /D %%d in ("%EXAMPLES_DIR%/target/spark-examples*hadoop*.jar") do ( + set SPARK_EXAMPLES_JAR=%%d +) + rem Figure out whether to run our class with java or with the scala launcher. rem In most cases, we'd prefer to execute our process with java because scala rem creates a shell script as the parent of its Java process, which makes it diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 755407aecc..3d149a742c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -43,6 +43,23 @@ class JavaStreamingContext(val ssc: StreamingContext) { def this(master: String, appName: String, batchDuration: Duration) = this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local + * file system or an HDFS, HTTP, HTTPS, or FTP URL. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jarFile: String) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map())) + /** * Creates a StreamingContext. * @param master Name of the Spark Master -- cgit v1.2.3 From 284ba90958df2d6efc08e3f8381bb9ef09f8b322 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 25 Feb 2013 19:40:52 -0800 Subject: createNewSparkContext should use sparkHome/jars/environment. This fixes a bug introduced by Matei's recent change. --- streaming/src/main/scala/spark/streaming/StreamingContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 31b5d2c8bc..b8b60aab43 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -505,7 +505,7 @@ object StreamingContext { if (MetadataCleaner.getDelaySeconds < 0) { MetadataCleaner.setDelaySeconds(3600) } - new SparkContext(master, appName) + new SparkContext(master, appName, sparkHome, jars, environment) } protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { -- cgit v1.2.3 From 8b06b359da38eb0e76f12f1db4b16edf12fccdda Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 28 Feb 2013 23:34:34 -0800 Subject: bump version to 0.7.1-SNAPSHOT in the subproject poms to keep the maven build building. --- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- repl-bin/pom.xml | 2 +- repl/pom.xml | 2 +- streaming/pom.xml | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) (limited to 'streaming') diff --git a/bagel/pom.xml b/bagel/pom.xml index a8256a6e8b..667d28c1a2 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 66c62151fe..9d46d94c1c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 7d975875fa..2adeec8786 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 0667b71cc7..a60028bb53 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml diff --git a/repl/pom.xml b/repl/pom.xml index 4a296fa630..a1b3ccece8 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 15523eadcb..d1a766aeac 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -4,7 +4,7 @@ org.spark-project parent - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT ../pom.xml -- cgit v1.2.3 From b40907310266be1be5db5f773bc9bcbf2813c090 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 1 Mar 2013 15:05:07 -0800 Subject: Instead of failing to bind to a fixed, already-in-use port, let the OS choose an available port for TestServer. --- .../test/scala/spark/streaming/InputStreamsSuite.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index ebcb6d0092..4d33857b25 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -29,7 +29,7 @@ import java.nio.charset.Charset import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") val testPort = 9999 @@ -44,12 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("socket input stream") { // Start the server - val testServer = new TestServer(testPort) + val testServer = new TestServer() testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) @@ -193,8 +193,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("actor input stream") { // Start the server - val port = testPort - val testServer = new TestServer(port) + val testServer = new TestServer() + val port = testServer.port testServer.start() // Set up the streaming context and input streams @@ -244,11 +244,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is server to test the network input stream */ -class TestServer(port: Int) extends Logging { +class TestServer() extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(port) + val serverSocket = new ServerSocket(0) val servingThread = new Thread() { override def run() { @@ -290,11 +290,13 @@ class TestServer(port: Int) extends Logging { def send(msg: String) { queue.add(msg) } def stop() { servingThread.interrupt() } + + def port = serverSocket.getLocalPort } object TestServer { def main(args: Array[String]) { - val s = new TestServer(9999) + val s = new TestServer() s.start() while(true) { Thread.sleep(1000) -- cgit v1.2.3 From 9e68f4862556995ff4a02251eac3583542c11ad8 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Mon, 11 Mar 2013 23:59:17 -0500 Subject: More quickly call close in HadoopRDD. This also refactors out the common "gotNext" iterator pattern into a shared utility class. --- core/src/main/scala/spark/rdd/HadoopRDD.scala | 31 +++--------- .../main/scala/spark/serializer/Serializer.scala | 32 ++---------- core/src/main/scala/spark/util/NextIterator.scala | 58 ++++++++++++++++++++++ .../streaming/dstream/SocketInputDStream.scala | 44 ++++------------ 4 files changed, 78 insertions(+), 87 deletions(-) create mode 100644 core/src/main/scala/spark/util/NextIterator.scala (limited to 'streaming') diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 78097502bc..43c6749ddc 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} +import spark.util.NextIterator /** @@ -62,7 +63,7 @@ class HadoopRDD[K, V]( .asInstanceOf[InputFormat[K, V]] } - override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] { + override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] var reader: RecordReader[K, V] = null @@ -75,34 +76,18 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - var gotNext = false - var finished = false - - override def hasNext: Boolean = { - if (!gotNext) { - try { - finished = !reader.next(key, value) - } catch { - case eof: EOFException => - finished = true - } - gotNext = true - } - !finished - } - override def next: (K, V) = { - if (!gotNext) { + override def getNext() = { + try { finished = !reader.next(key, value) + } catch { + case eof: EOFException => + finished = true } - if (finished) { - throw new NoSuchElementException("End of stream") - } - gotNext = false (key, value) } - private def close() { + override def close() { try { reader.close() } catch { diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala index 50b086125a..d94ffa78f7 100644 --- a/core/src/main/scala/spark/serializer/Serializer.scala +++ b/core/src/main/scala/spark/serializer/Serializer.scala @@ -72,40 +72,14 @@ trait DeserializationStream { * Read the elements of this stream through an iterator. This can only be called once, as * reading each element will consume data from the input source. */ - def asIterator: Iterator[Any] = new Iterator[Any] { - var gotNext = false - var finished = false - var nextValue: Any = null - - private def getNext() { + def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] { + override protected def getNext() = { try { - nextValue = readObject[Any]() + readObject[Any]() } catch { case eof: EOFException => finished = true } - gotNext = true - } - - override def hasNext: Boolean = { - if (!gotNext) { - getNext() - } - if (finished) { - close() - } - !finished - } - - override def next(): Any = { - if (!gotNext) { - getNext() - } - if (finished) { - throw new NoSuchElementException("End of stream") - } - gotNext = false - nextValue } } } diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala new file mode 100644 index 0000000000..32fae42e73 --- /dev/null +++ b/core/src/main/scala/spark/util/NextIterator.scala @@ -0,0 +1,58 @@ +package spark.util + +/** Provides a basic/boilerplate Iterator implementation. */ +private[spark] abstract class NextIterator[U] extends Iterator[U] { + + private var gotNext = false + private var nextValue: U = _ + protected var finished = false + + /** + * Method for subclasses to implement to provide the next element. + * + * If no next element is available, the subclass should set `finished` + * to `true` and may return any value (it will be ignored). + * + * This convention is required because `null` may be a valid value, + * and using `Option` seems like it might create unnecessary Some/None + * instances, given some iterators might be called in a tight loop. + * + * @return U, or set 'finished' when done + */ + protected def getNext(): U + + /** + * Method for subclasses to optionally implement when all elements + * have been successfully iterated, and the iteration is done. + * + * Note: `NextIterator` cannot guarantee that `close` will be + * called because it has no control over what happens when an exception + * happens in the user code that is calling hasNext/next. + * + * Ideally you should have another try/catch, as in HadoopRDD, that + * ensures any resources are closed should iteration fail. + */ + protected def close() { + } + + override def hasNext: Boolean = { + if (!finished) { + if (!gotNext) { + nextValue = getNext() + if (finished) { + close() + } + gotNext = true + } + } + !finished + } + + override def next(): U = { + if (!hasNext) { + throw new NoSuchElementException("End of stream") + } + gotNext = false + nextValue + } +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 4af839ad7f..38239b054a 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -2,6 +2,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext import spark.storage.StorageLevel +import spark.util.NextIterator import java.io._ import java.net.Socket @@ -59,45 +60,18 @@ object SocketReceiver { */ def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } + new NextIterator[String] { + protected override def getNext() { + val nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true } - !finished + nextValue } - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue + protected override def close() { + dataInputStream.close() } } - iterator } } -- cgit v1.2.3 From 0cf320485d60d96a6ae5f3ec6dc13aadecdef0eb Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 12 Mar 2013 00:05:35 -0500 Subject: Forgot equals. --- .../src/main/scala/spark/streaming/dstream/SocketInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 38239b054a..1408af0afa 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -61,7 +61,7 @@ object SocketReceiver { def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) new NextIterator[String] { - protected override def getNext() { + protected override def getNext() = { val nextValue = dataInputStream.readLine() if (nextValue == null) { finished = true -- cgit v1.2.3 From 7fd2708edaa863701ed8032e395e255df399d898 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Fri, 15 Mar 2013 11:41:51 -0700 Subject: Add a log4j compile dependency to fix build in IntelliJ Also rename parent project to spark-parent (otherwise it shows up as "parent" in IntelliJ, which is very confusing). --- bagel/pom.xml | 2 +- core/pom.xml | 6 +++++- examples/pom.xml | 2 +- pom.xml | 9 ++++++++- repl-bin/pom.xml | 2 +- repl/pom.xml | 2 +- streaming/pom.xml | 2 +- 7 files changed, 18 insertions(+), 7 deletions(-) (limited to 'streaming') diff --git a/bagel/pom.xml b/bagel/pom.xml index 667d28c1a2..510cff4669 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 9d46d94c1c..fe9c803728 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml @@ -87,6 +87,10 @@ org.apache.mesos mesos + + log4j + log4j + org.scalatest diff --git a/examples/pom.xml b/examples/pom.xml index 2adeec8786..39cc47c709 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 09ad903e6e..08d1fc12e0 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT pom Spark Project Parent POM @@ -58,6 +58,7 @@ 1.1.1 1.6.1 4.1.2 + 1.2.17 @@ -267,6 +268,12 @@ ${scala.version} + + log4j + log4j + ${log4j.version} + + org.scalatest scalatest_${scala.version} diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index a60028bb53..dd720e2291 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml diff --git a/repl/pom.xml b/repl/pom.xml index a1b3ccece8..a3e4606edc 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index d1a766aeac..ec077e8089 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project - parent + spark-parent 0.7.1-SNAPSHOT ../pom.xml -- cgit v1.2.3 From 1f5381119f8c8afd0ba69bc7773c10972dd43bc1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 24 Mar 2013 16:21:51 -0700 Subject: method first in trait IterableLike is deprecated: use `head' instead --- .../src/main/scala/spark/streaming/dstream/QueueInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala index 6b310bc0b6..da224ad6f7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -28,7 +28,7 @@ class QueueInputDStream[T: ClassManifest]( } if (buffer.size > 0) { if (oneAtATime) { - Some(buffer.first) + Some(buffer.head) } else { Some(new UnionRDD(ssc.sc, buffer.toSeq)) } -- cgit v1.2.3 From b569b3f20032cb581024f17dc0f45fbfb5f75f43 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 26 Mar 2013 18:14:04 -0700 Subject: Move streaming test initialization into 'before' blocks --- streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 6 ++++-- streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 8fce91853c..cf2ed8b1d4 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,10 +6,12 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - override def framework() = "BasicOperationsSuite" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 4d33857b25..67dca2ac31 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -30,12 +30,14 @@ import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val testPort = 9999 override def checkpointDir = "checkpoint" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") -- cgit v1.2.3 From bc8ba222ff29d46cd2e3331753c9c4ce681eccb4 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 28 Mar 2013 15:42:01 -0700 Subject: Bump development version to 0.8.0 --- bagel/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- repl-bin/pom.xml | 2 +- repl/pom.xml | 2 +- repl/src/main/scala/spark/repl/SparkILoop.scala | 2 +- streaming/pom.xml | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) (limited to 'streaming') diff --git a/bagel/pom.xml b/bagel/pom.xml index 510cff4669..be2e358091 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index fe9c803728..08717860a7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/docs/_config.yml b/docs/_config.yml index f99d5bb376..a6aa38a46d 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.7.1-SNAPSHOT -SPARK_VERSION_SHORT: 0.7.1 +SPARK_VERSION: 0.8.0-SNAPSHOT +SPARK_VERSION_SHORT: 0.8.0 SCALA_VERSION: 2.9.2 MESOS_VERSION: 0.9.0-incubating SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..d014089fe4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 08d1fc12e0..1174b475d3 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT pom Spark Project Parent POM http://spark-project.org/ diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..250211fb0c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -35,7 +35,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", - version := "0.7.1-SNAPSHOT", + version := "0.8.0-SNAPSHOT", scalaVersion := "2.9.2", scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index dd720e2291..fe526a7616 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/repl/pom.xml b/repl/pom.xml index a3e4606edc..0b5e400c3d 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index cd7b5128b2..39b213851f 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.7.1 + /___/ .__/\_,_/_/ /_/\_\ version 0.8.0-SNAPSHOT /_/ """) import Properties._ diff --git a/streaming/pom.xml b/streaming/pom.xml index ec077e8089..b0d0cd0ff3 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -4,7 +4,7 @@ org.spark-project spark-parent - 0.7.1-SNAPSHOT + 0.8.0-SNAPSHOT ../pom.xml -- cgit v1.2.3 From df47b40b764e25cbd10ce49d7152e1d33f51a263 Mon Sep 17 00:00:00 2001 From: shane-huang Date: Wed, 20 Feb 2013 11:51:13 +0800 Subject: Shuffle Performance fix: Use netty embeded OIO file server instead of ConnectionManager Shuffle Performance Optimization: do not send 0-byte block requests to reduce network messages change reference from io.Source to scala.io.Source to avoid looking into io.netty package Signed-off-by: shane-huang --- .../main/java/spark/network/netty/FileClient.java | 89 +++++++ .../netty/FileClientChannelInitializer.java | 29 +++ .../spark/network/netty/FileClientHandler.java | 38 +++ .../main/java/spark/network/netty/FileServer.java | 59 +++++ .../netty/FileServerChannelInitializer.java | 33 +++ .../spark/network/netty/FileServerHandler.java | 68 ++++++ .../java/spark/network/netty/PathResolver.java | 12 + .../scala/spark/network/netty/FileHeader.scala | 57 +++++ .../scala/spark/network/netty/ShuffleCopier.scala | 88 +++++++ .../scala/spark/network/netty/ShuffleSender.scala | 50 ++++ .../main/scala/spark/storage/BlockManager.scala | 272 +++++++++++++++++---- core/src/main/scala/spark/storage/DiskStore.scala | 51 +++- project/SparkBuild.scala | 3 +- .../scala/spark/streaming/util/RawTextSender.scala | 2 +- 14 files changed, 795 insertions(+), 56 deletions(-) create mode 100644 core/src/main/java/spark/network/netty/FileClient.java create mode 100644 core/src/main/java/spark/network/netty/FileClientChannelInitializer.java create mode 100644 core/src/main/java/spark/network/netty/FileClientHandler.java create mode 100644 core/src/main/java/spark/network/netty/FileServer.java create mode 100644 core/src/main/java/spark/network/netty/FileServerChannelInitializer.java create mode 100644 core/src/main/java/spark/network/netty/FileServerHandler.java create mode 100755 core/src/main/java/spark/network/netty/PathResolver.java create mode 100644 core/src/main/scala/spark/network/netty/FileHeader.scala create mode 100644 core/src/main/scala/spark/network/netty/ShuffleCopier.scala create mode 100644 core/src/main/scala/spark/network/netty/ShuffleSender.scala (limited to 'streaming') diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java new file mode 100644 index 0000000000..d0c5081dd2 --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileClient.java @@ -0,0 +1,89 @@ +package spark.network.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.oio.OioSocketChannel; + +import java.util.Arrays; + +public class FileClient { + + private FileClientHandler handler = null; + private Channel channel = null; + private Bootstrap bootstrap = null; + + public FileClient(FileClientHandler handler){ + this.handler = handler; + } + + public void init(){ + bootstrap = new Bootstrap(); + bootstrap.group(new OioEventLoopGroup()) + .channel(OioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new FileClientChannelInitializer(handler)); + } + + public static final class ChannelCloseListener implements ChannelFutureListener { + private FileClient fc = null; + public ChannelCloseListener(FileClient fc){ + this.fc = fc; + } + @Override + public void operationComplete(ChannelFuture future) { + if (fc.bootstrap!=null){ + fc.bootstrap.shutdown(); + fc.bootstrap = null; + } + } + } + + public void connect(String host, int port){ + try { + + // Start the connection attempt. + channel = bootstrap.connect(host, port).sync().channel(); + // ChannelFuture cf = channel.closeFuture(); + //cf.addListener(new ChannelCloseListener(this)); + } catch (InterruptedException e) { + close(); + } + } + + public void waitForClose(){ + try { + channel.closeFuture().sync(); + } catch (InterruptedException e){ + e.printStackTrace(); + } + } + + public void sendRequest(String file){ + //assert(file == null); + //assert(channel == null); + channel.write(file+"\r\n"); + } + + public void close(){ + if(channel != null) { + channel.close(); + channel = null; + } + if ( bootstrap!=null) { + bootstrap.shutdown(); + bootstrap = null; + } + } + + +} + + diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java new file mode 100644 index 0000000000..50e5704619 --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java @@ -0,0 +1,29 @@ +package spark.network.netty; + +import io.netty.buffer.BufType; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; + +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.logging.LogLevel; + +public class FileClientChannelInitializer extends + ChannelInitializer { + + private FileClientHandler fhandler; + + public FileClientChannelInitializer(FileClientHandler handler) { + fhandler = handler; + } + + @Override + public void initChannel(SocketChannel channel) { + // file no more than 2G + channel.pipeline() + .addLast("encoder", new StringEncoder(BufType.BYTE)) + .addLast("handler", fhandler); + } +} diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java new file mode 100644 index 0000000000..911c8b32b5 --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileClientHandler.java @@ -0,0 +1,38 @@ +package spark.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.util.CharsetUtil; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +public abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { + + private FileHeader currentHeader = null; + + public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header); + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) { + // Use direct buffer if possible. + return ctx.alloc().ioBuffer(); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) { + // get header + if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) { + currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE())); + } + // get file + if(in.readableBytes() >= currentHeader.fileLen()){ + handle(ctx,in,currentHeader); + currentHeader = null; + ctx.close(); + } + } + +} + diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java new file mode 100644 index 0000000000..729e45f0a1 --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -0,0 +1,59 @@ +package spark.network.netty; + +import java.io.File; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.Channel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.oio.OioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +/** + * Server that accept the path of a file an echo back its content. + */ +public class FileServer { + + private ServerBootstrap bootstrap = null; + private Channel channel = null; + private PathResolver pResolver; + + public FileServer(PathResolver pResolver){ + this.pResolver = pResolver; + } + + public void run(int port) { + // Configure the server. + bootstrap = new ServerBootstrap(); + try { + bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) + .channel(OioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .option(ChannelOption.SO_RCVBUF, 1500) + .childHandler(new FileServerChannelInitializer(pResolver)); + // Start the server. + channel = bootstrap.bind(port).sync().channel(); + channel.closeFuture().sync(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally{ + bootstrap.shutdown(); + } + } + + public void stop(){ + if (channel!=null){ + channel.close(); + } + if (bootstrap != null){ + bootstrap.shutdown(); + bootstrap = null; + } + } +} + + diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java new file mode 100644 index 0000000000..9d0618ff1c --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java @@ -0,0 +1,33 @@ +package spark.network.netty; + +import java.io.File; +import io.netty.buffer.BufType; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.util.CharsetUtil; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.logging.LogLevel; + +public class FileServerChannelInitializer extends + ChannelInitializer { + + PathResolver pResolver; + + public FileServerChannelInitializer(PathResolver pResolver) { + this.pResolver = pResolver; + } + + @Override + public void initChannel(SocketChannel channel) { + channel.pipeline() + .addLast("framer", new DelimiterBasedFrameDecoder( + 8192, Delimiters.lineDelimiter())) + .addLast("strDecoder", new StringDecoder()) + .addLast("handler", new FileServerHandler(pResolver)); + + } +} diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java new file mode 100644 index 0000000000..e1083e87a2 --- /dev/null +++ b/core/src/main/java/spark/network/netty/FileServerHandler.java @@ -0,0 +1,68 @@ +package spark.network.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.DefaultFileRegion; +import io.netty.handler.stream.ChunkedFile; +import java.io.File; +import java.io.FileInputStream; + +public class FileServerHandler extends + ChannelInboundMessageHandlerAdapter { + + PathResolver pResolver; + + public FileServerHandler(PathResolver pResolver){ + this.pResolver = pResolver; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, String blockId) { + String path = pResolver.getAbsolutePath(blockId); + // if getFilePath returns null, close the channel + if (path == null) { + //ctx.close(); + return; + } + File file = new File(path); + if (file.exists()) { + if (!file.isFile()) { + //logger.info("Not a file : " + file.getAbsolutePath()); + ctx.write(new FileHeader(0, blockId).buffer()); + ctx.flush(); + return; + } + long length = file.length(); + if (length > Integer.MAX_VALUE || length <= 0 ) { + //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length); + ctx.write(new FileHeader(0, blockId).buffer()); + ctx.flush(); + return; + } + int len = new Long(length).intValue(); + //logger.info("Sending block "+blockId+" filelen = "+len); + //logger.info("header = "+ (new FileHeader(len, blockId)).buffer()); + ctx.write((new FileHeader(len, blockId)).buffer()); + try { + ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) + .getChannel(), 0, file.length())); + } catch (Exception e) { + // TODO Auto-generated catch block + //logger.warning("Exception when sending file : " + //+ file.getAbsolutePath()); + e.printStackTrace(); + } + } else { + //logger.warning("File not found: " + file.getAbsolutePath()); + ctx.write(new FileHeader(0, blockId).buffer()); + } + ctx.flush(); + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java new file mode 100755 index 0000000000..5d5eda006e --- /dev/null +++ b/core/src/main/java/spark/network/netty/PathResolver.java @@ -0,0 +1,12 @@ +package spark.network.netty; + +public interface PathResolver { + /** + * Get the absolute path of the file + * + * @param fileId + * @return the absolute path of file + */ + public String getAbsolutePath(String fileId); + +} diff --git a/core/src/main/scala/spark/network/netty/FileHeader.scala b/core/src/main/scala/spark/network/netty/FileHeader.scala new file mode 100644 index 0000000000..aed4254234 --- /dev/null +++ b/core/src/main/scala/spark/network/netty/FileHeader.scala @@ -0,0 +1,57 @@ +package spark.network.netty + +import io.netty.buffer._ + +import spark.Logging + +private[spark] class FileHeader ( + val fileLen: Int, + val blockId: String) extends Logging { + + lazy val buffer = { + val buf = Unpooled.buffer() + buf.capacity(FileHeader.HEADER_SIZE) + buf.writeInt(fileLen) + buf.writeInt(blockId.length) + blockId.foreach((x: Char) => buf.writeByte(x)) + //padding the rest of header + if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) { + buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes) + } else { + throw new Exception("too long header " + buf.readableBytes) + logInfo("too long header") + } + buf + } + +} + +private[spark] object FileHeader { + + val HEADER_SIZE = 40 + + def getFileLenOffset = 0 + def getFileLenSize = Integer.SIZE/8 + + def create(buf: ByteBuf): FileHeader = { + val length = buf.readInt + val idLength = buf.readInt + val idBuilder = new StringBuilder(idLength) + for (i <- 1 to idLength) { + idBuilder += buf.readByte().asInstanceOf[Char] + } + val blockId = idBuilder.toString() + new FileHeader(length, blockId) + } + + + def main (args:Array[String]){ + + val header = new FileHeader(25,"block_0"); + val buf = header.buffer; + val newheader = FileHeader.create(buf); + System.out.println("id="+newheader.blockId+",size="+newheader.fileLen) + + } +} + diff --git a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala new file mode 100644 index 0000000000..d8d35bfeec --- /dev/null +++ b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala @@ -0,0 +1,88 @@ +package spark.network.netty + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundByteHandlerAdapter +import io.netty.util.CharsetUtil + +import java.util.concurrent.atomic.AtomicInteger +import java.util.logging.Logger +import spark.Logging +import spark.network.ConnectionManagerId +import java.util.concurrent.Executors + +private[spark] class ShuffleCopier extends Logging { + + def getBlock(cmId: ConnectionManagerId, + blockId: String, + resultCollectCallback: (String, Long, ByteBuf) => Unit) = { + + val handler = new ShuffleClientHandler(resultCollectCallback) + val fc = new FileClient(handler) + fc.init() + fc.connect(cmId.host, cmId.port) + fc.sendRequest(blockId) + fc.waitForClose() + fc.close() + } + + def getBlocks(cmId: ConnectionManagerId, + blocks: Seq[(String, Long)], + resultCollectCallback: (String, Long, ByteBuf) => Unit) = { + + blocks.map { + case(blockId,size) => { + getBlock(cmId,blockId,resultCollectCallback) + } + } + } +} + +private[spark] class ShuffleClientHandler(val resultCollectCallBack: (String, Long, ByteBuf) => Unit ) extends FileClientHandler with Logging { + + def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) { + logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)"); + resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen)) + } +} + +private[spark] object ShuffleCopier extends Logging { + + def echoResultCollectCallBack(blockId: String, size: Long, content: ByteBuf) = { + logInfo("File: " + blockId + " content is : \" " + + content.toString(CharsetUtil.UTF_8) + "\"") + } + + def runGetBlock(host:String, port:Int, file:String){ + val handler = new ShuffleClientHandler(echoResultCollectCallBack) + val fc = new FileClient(handler) + fc.init(); + fc.connect(host, port) + fc.sendRequest(file) + fc.waitForClose(); + fc.close() + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: ShuffleCopier ") + System.exit(1) + } + val host = args(0) + val port = args(1).toInt + val file = args(2) + val threads = if (args.length>3) args(3).toInt else 10 + + val copiers = Executors.newFixedThreadPool(80) + for (i <- Range(0,threads)){ + val runnable = new Runnable() { + def run() { + runGetBlock(host,port,file) + } + } + copiers.execute(runnable) + } + copiers.shutdown + } + +} diff --git a/core/src/main/scala/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/spark/network/netty/ShuffleSender.scala new file mode 100644 index 0000000000..c1986812e9 --- /dev/null +++ b/core/src/main/scala/spark/network/netty/ShuffleSender.scala @@ -0,0 +1,50 @@ +package spark.network.netty + +import spark.Logging +import java.io.File + + +private[spark] class ShuffleSender(val port: Int, val pResolver:PathResolver) extends Logging { + val server = new FileServer(pResolver) + + Runtime.getRuntime().addShutdownHook( + new Thread() { + override def run() { + server.stop() + } + } + ) + + def start() { + server.run(port) + } +} + +private[spark] object ShuffleSender { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: ShuffleSender ") + System.exit(1) + } + val port = args(0).toInt + val subDirsPerLocalDir = args(1).toInt + val localDirs = args.drop(2) map {new File(_)} + val pResovler = new PathResolver { + def getAbsolutePath(blockId:String):String = { + if (!blockId.startsWith("shuffle_")) { + throw new Exception("Block " + blockId + " is not a shuffle block") + } + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = math.abs(blockId.hashCode) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) + val file = new File(subDir, blockId) + return file.getAbsolutePath + } + } + val sender = new ShuffleSender(port, pResovler) + + sender.start() + } +} diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 210061e972..b8b68d4283 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -23,6 +23,8 @@ import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStam import sun.nio.ch.DirectBuffer +import spark.network.netty.ShuffleCopier +import io.netty.buffer.ByteBuf private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) @@ -467,6 +469,21 @@ class BlockManager( getLocal(blockId).orElse(getRemote(blockId)) } + /** + * A request to fetch one or more blocks, complete with their sizes + */ + class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { + val size = blocks.map(_._2).sum + } + + /** + * A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize + * the block (since we want all deserializaton to happen in the calling thread); can also + * represent a fetch failure if size == -1. + */ + class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { + def failed: Boolean = size == -1 + } /** * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined @@ -475,7 +492,12 @@ class BlockManager( */ def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]) : BlockFetcherIterator = { - return new BlockFetcherIterator(this, blocksByAddress) + + if(System.getProperty("spark.shuffle.use.netty", "false").toBoolean){ + return new NettyBlockFetcherIterator(this, blocksByAddress) + } else { + return new BlockFetcherIterator(this, blocksByAddress) + } } def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) @@ -908,7 +930,7 @@ class BlockFetcherIterator( if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") } - val totalBlocks = blocksByAddress.map(_._2.size).sum + var totalBlocks = blocksByAddress.map(_._2.size).sum logDebug("Getting " + totalBlocks + " blocks") var startTime = System.currentTimeMillis val localBlockIds = new ArrayBuffer[String]() @@ -974,68 +996,83 @@ class BlockFetcherIterator( } } - // Split local and remote blocks. Remote blocks are further split into FetchRequests of size - // at most maxBytesInFlight in order to limit the amount of data in flight. - val remoteRequests = new ArrayBuffer[FetchRequest] - for ((address, blockInfos) <- blocksByAddress) { - if (address == blockManagerId) { - localBlockIds ++= blockInfos.map(_._1) - } else { - remoteBlockIds ++= blockInfos.map(_._1) - // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 - // nodes, rather than blocking on reading output from one node. - val minRequestSize = math.max(maxBytesInFlight / 5, 1L) - logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) - val iterator = blockInfos.iterator - var curRequestSize = 0L - var curBlocks = new ArrayBuffer[(String, Long)] - while (iterator.hasNext) { - val (blockId, size) = iterator.next() - curBlocks += ((blockId, size)) - curRequestSize += size - if (curRequestSize >= minRequestSize) { - // Add this FetchRequest + def splitLocalRemoteBlocks():ArrayBuffer[FetchRequest] = { + // Split local and remote blocks. Remote blocks are further split into FetchRequests of size + // at most maxBytesInFlight in order to limit the amount of data in flight. + val remoteRequests = new ArrayBuffer[FetchRequest] + for ((address, blockInfos) <- blocksByAddress) { + if (address == blockManagerId) { + localBlockIds ++= blockInfos.map(_._1) + } else { + remoteBlockIds ++= blockInfos.map(_._1) + // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + // nodes, rather than blocking on reading output from one node. + val minRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) + val iterator = blockInfos.iterator + var curRequestSize = 0L + var curBlocks = new ArrayBuffer[(String, Long)] + while (iterator.hasNext) { + val (blockId, size) = iterator.next() + curBlocks += ((blockId, size)) + curRequestSize += size + if (curRequestSize >= minRequestSize) { + // Add this FetchRequest + remoteRequests += new FetchRequest(address, curBlocks) + curRequestSize = 0 + curBlocks = new ArrayBuffer[(String, Long)] + } + } + // Add in the final request + if (!curBlocks.isEmpty) { remoteRequests += new FetchRequest(address, curBlocks) - curRequestSize = 0 - curBlocks = new ArrayBuffer[(String, Long)] } } - // Add in the final request - if (!curBlocks.isEmpty) { - remoteRequests += new FetchRequest(address, curBlocks) - } } + remoteRequests } - // Add the remote requests into our queue in a random order - fetchRequests ++= Utils.randomize(remoteRequests) - // Send out initial requests for blocks, up to our maxBytesInFlight - while (!fetchRequests.isEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) + def getLocalBlocks(){ + // Get the local blocks while remote blocks are being fetched. Note that it's okay to do + // these all at once because they will just memory-map some files, so they won't consume + // any memory that might exceed our maxBytesInFlight + for (id <- localBlockIds) { + getLocal(id) match { + case Some(iter) => { + results.put(new FetchResult(id, 0, () => iter)) // Pass 0 as size since it's not in flight + logDebug("Got local block " + id) + } + case None => { + throw new BlockException(id, "Could not get block " + id + " from local machine") + } + } + } } - val numGets = remoteBlockIds.size - fetchRequests.size - logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) - - // Get the local blocks while remote blocks are being fetched. Note that it's okay to do - // these all at once because they will just memory-map some files, so they won't consume - // any memory that might exceed our maxBytesInFlight - startTime = System.currentTimeMillis - for (id <- localBlockIds) { - getLocal(id) match { - case Some(iter) => { - results.put(new FetchResult(id, 0, () => iter)) // Pass 0 as size since it's not in flight - logDebug("Got local block " + id) - } - case None => { - throw new BlockException(id, "Could not get block " + id + " from local machine") - } + def initialize(){ + // Split local and remote blocks. + val remoteRequests = splitLocalRemoteBlocks() + // Add the remote requests into our queue in a random order + fetchRequests ++= Utils.randomize(remoteRequests) + + // Send out initial requests for blocks, up to our maxBytesInFlight + while (!fetchRequests.isEmpty && + (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + sendRequest(fetchRequests.dequeue()) } + + val numGets = remoteBlockIds.size - fetchRequests.size + logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) + + // Get Local Blocks + startTime = System.currentTimeMillis + getLocalBlocks() + logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") + } - logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") + initialize() //an iterator that will read fetched blocks off the queue as they arrive. var resultsGotten = 0 @@ -1066,3 +1103,132 @@ class BlockFetcherIterator( def remoteBytesRead = _remoteBytesRead } + +class NettyBlockFetcherIterator( + blockManager: BlockManager, + blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] +) extends BlockFetcherIterator(blockManager,blocksByAddress) { + + import blockManager._ + + val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest] + + def putResult(blockId:String, blockSize:Long, blockData:ByteBuffer, + results : LinkedBlockingQueue[FetchResult]){ + results.put(new FetchResult( + blockId, blockSize, () => dataDeserialize(blockId, blockData) )) + } + + def startCopiers (numCopiers: Int): List [ _ <: Thread]= { + (for ( i <- Range(0,numCopiers) ) yield { + val copier = new Thread { + override def run(){ + try { + while(!isInterrupted && !fetchRequestsSync.isEmpty) { + sendRequest(fetchRequestsSync.take()) + } + } catch { + case x: InterruptedException => logInfo("Copier Interrupted") + case _ => throw new SparkException("Exception Throw in Shuffle Copier") + } + } + } + copier.start + copier + }).toList + } + + //keep this to interrupt the threads when necessary + def stopCopiers(copiers : List[_ <: Thread]) { + for (copier <- copiers) { + copier.interrupt() + } + } + + override def sendRequest(req: FetchRequest) { + logDebug("Sending request for %d blocks (%s) from %s".format( + req.blocks.size, Utils.memoryBytesToString(req.size), req.address.ip)) + val cmId = new ConnectionManagerId(req.address.ip, System.getProperty("spark.shuffle.sender.port", "6653").toInt) + val cpier = new ShuffleCopier + cpier.getBlocks(cmId,req.blocks,(blockId:String,blockSize:Long,blockData:ByteBuf) => putResult(blockId,blockSize,blockData.nioBuffer,results)) + logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.ip ) + } + + override def splitLocalRemoteBlocks() : ArrayBuffer[FetchRequest] = { + // Split local and remote blocks. Remote blocks are further split into FetchRequests of size + // at most maxBytesInFlight in order to limit the amount of data in flight. + val originalTotalBlocks = totalBlocks; + val remoteRequests = new ArrayBuffer[FetchRequest] + for ((address, blockInfos) <- blocksByAddress) { + if (address == blockManagerId) { + localBlockIds ++= blockInfos.map(_._1) + } else { + remoteBlockIds ++= blockInfos.map(_._1) + // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + // nodes, rather than blocking on reading output from one node. + val minRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) + val iterator = blockInfos.iterator + var curRequestSize = 0L + var curBlocks = new ArrayBuffer[(String, Long)] + while (iterator.hasNext) { + val (blockId, size) = iterator.next() + if (size > 0) { + curBlocks += ((blockId, size)) + curRequestSize += size + } else if (size == 0){ + //here we changes the totalBlocks + totalBlocks -= 1 + } else { + throw new SparkException("Negative block size "+blockId) + } + if (curRequestSize >= minRequestSize) { + // Add this FetchRequest + remoteRequests += new FetchRequest(address, curBlocks) + curRequestSize = 0 + curBlocks = new ArrayBuffer[(String, Long)] + } + } + // Add in the final request + if (!curBlocks.isEmpty) { + remoteRequests += new FetchRequest(address, curBlocks) + } + } + } + logInfo("Getting " + totalBlocks + " non 0-byte blocks out of " + originalTotalBlocks + " blocks") + remoteRequests + } + + var copiers : List[_ <: Thread] = null + + override def initialize(){ + // Split Local Remote Blocks and adjust totalBlocks to include only the non 0-byte blocks + val remoteRequests = splitLocalRemoteBlocks() + // Add the remote requests into our queue in a random order + for (request <- Utils.randomize(remoteRequests)) { + fetchRequestsSync.put(request) + } + + copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt) + logInfo("Started " + fetchRequestsSync.size + " remote gets in " + Utils.getUsedTimeMs(startTime)) + + // Get Local Blocks + startTime = System.currentTimeMillis + getLocalBlocks() + logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") + } + + override def next(): (String, Option[Iterator[Any]]) = { + resultsGotten += 1 + val result = results.take() + // if all the results has been retrieved + // shutdown the copiers + if (resultsGotten == totalBlocks) { + if( copiers != null ) + stopCopiers(copiers) + } + (result.blockId, if (result.failed) None else Some(result.deserialize())) + } + } + diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index ddbf8821ad..d702bb23e0 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -13,24 +13,35 @@ import scala.collection.mutable.ArrayBuffer import spark.executor.ExecutorExitCode import spark.Utils +import spark.Logging +import spark.network.netty.ShuffleSender +import spark.network.netty.PathResolver /** * Stores BlockManager blocks on disk. */ private class DiskStore(blockManager: BlockManager, rootDirs: String) - extends BlockStore(blockManager) { + extends BlockStore(blockManager) with Logging { val MAX_DIR_CREATION_ATTEMPTS: Int = 10 val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt + var shuffleSender : Thread = null + val thisInstance = this // Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid // having really large inodes at the top level. val localDirs = createLocalDirs() val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean + addShutdownHook() + if(useNetty){ + startShuffleBlockSender() + } + override def getSize(blockId: String): Long = { getFile(blockId).length() } @@ -180,10 +191,48 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Shutdown hook called") try { localDirs.foreach(localDir => Utils.deleteRecursively(localDir)) + if (useNetty && shuffleSender != null) + shuffleSender.stop } catch { case t: Throwable => logError("Exception while deleting local spark dirs", t) } } }) } + + private def startShuffleBlockSender (){ + try { + val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt + + val pResolver = new PathResolver { + def getAbsolutePath(blockId:String):String = { + if (!blockId.startsWith("shuffle_")) { + return null + } + thisInstance.getFile(blockId).getAbsolutePath() + } + } + shuffleSender = new Thread { + override def run() = { + val sender = new ShuffleSender(port,pResolver) + logInfo("created ShuffleSender binding to port : "+ port) + sender.start + } + } + shuffleSender.setDaemon(true) + shuffleSender.start + + } catch { + case interrupted: InterruptedException => + logInfo("Runner thread for ShuffleBlockSender interrupted") + + case e: Exception => { + logError("Error running ShuffleBlockSender ", e) + if (shuffleSender != null) { + shuffleSender.stop + shuffleSender = null + } + } + } + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..e3645653ee 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -141,7 +141,8 @@ object SparkBuild extends Build { "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "cc.spray" %% "spray-json" % "1.1.1", - "org.apache.mesos" % "mesos" % "0.9.0-incubating" + "org.apache.mesos" % "mesos" % "0.9.0-incubating", + "io.netty" % "netty-all" % "4.0.0.Beta2" ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala index d8b987ec86..bd0b0e74c1 100644 --- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala @@ -5,7 +5,7 @@ import spark.util.{RateLimitedOutputStream, IntParam} import java.net.ServerSocket import spark.{Logging, KryoSerializer} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import io.Source +import scala.io.Source import java.io.IOException /** -- cgit v1.2.3 From 6798a09df84fb97e196c84d55cf3e21ad676871f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 7 Apr 2013 17:47:38 +0530 Subject: Add support for building against hadoop2-yarn : adding new maven profile for it --- bagel/pom.xml | 37 +++++++++++ core/pom.xml | 62 +++++++++++++++++++ .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 3 + .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 3 + .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 13 ++++ .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 13 ++++ .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 3 + .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 3 + core/src/main/scala/spark/PairRDDFunctions.scala | 5 +- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 2 +- examples/pom.xml | 43 +++++++++++++ pom.xml | 54 ++++++++++++++++ project/SparkBuild.scala | 34 +++++++++-- repl-bin/pom.xml | 50 +++++++++++++++ repl/pom.xml | 71 ++++++++++++++++++++++ streaming/pom.xml | 37 +++++++++++ 16 files changed, 424 insertions(+), 9 deletions(-) create mode 100644 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala create mode 100644 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala (limited to 'streaming') diff --git a/bagel/pom.xml b/bagel/pom.xml index 510cff4669..89282161ea 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -102,5 +102,42 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/core/pom.xml b/core/pom.xml index fe9c803728..9baa447662 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -279,5 +279,67 @@ + + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/main/scala + src/hadoop2-yarn/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index ca9f7219de..f286f2cf9c 100644 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -4,4 +4,7 @@ trait HadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index de7b0f81e3..264d421d14 100644 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -6,4 +6,7 @@ trait HadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 0000000000..875c0a220b --- /dev/null +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,13 @@ + +package org.apache.hadoop.mapred + +import org.apache.hadoop.mapreduce.TaskType + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = + new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) +} diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 0000000000..8bc6fb6dea --- /dev/null +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,13 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration +import task.{TaskAttemptContextImpl, JobContextImpl} + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = + new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index 35300cea58..a0652d7fc7 100644 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -4,4 +4,7 @@ trait HadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index 7afdbff320..7fdbe322fd 100644 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -7,4 +7,7 @@ trait HadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 07efba9e8d..39469fa3c8 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -545,8 +545,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" */ - val attemptId = new TaskAttemptID(jobtrackerID, - stageId, false, context.splitId, attemptNumber) + val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance val committer = format.getOutputCommitter(hadoopContext) @@ -565,7 +564,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * however we're only going to use this local OutputCommitter for * setupJob/commitJob, so we just use a dummy "map" task. */ - val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index bdd974590a..901d01ef30 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -57,7 +57,7 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] val conf = confBroadcast.value.value - val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) + val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance if (format.isInstanceOf[Configurable]) { diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..9594257ad4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -118,5 +118,48 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-streaming + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/pom.xml b/pom.xml index 08d1fc12e0..b3134a957d 100644 --- a/pom.xml +++ b/pom.xml @@ -558,5 +558,59 @@ + + + hadoop2-yarn + + 2 + 2.0.3-alpha + + + + + maven-root + Maven root repository + http://repo1.maven.org/maven2/ + + true + + + false + + + + + + + + + org.apache.hadoop + hadoop-client + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + + org.apache.avro + avro + 1.7.1.cloudera.2 + + + org.apache.avro + avro-ipc + 1.7.1.cloudera.2 + + + + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..f041930b4e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1,3 +1,4 @@ + import sbt._ import sbt.Classpaths.publishTask import Keys._ @@ -10,12 +11,18 @@ import twirl.sbt.TwirlPlugin._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.0.4" - val HADOOP_MAJOR_VERSION = "1" + //val HADOOP_VERSION = "1.0.4" + //val HADOOP_MAJOR_VERSION = "1" // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" //val HADOOP_MAJOR_VERSION = "2" + //val HADOOP_YARN = false + + // For Hadoop 2 YARN support + val HADOOP_VERSION = "2.0.3-alpha" + val HADOOP_MAJOR_VERSION = "2" + val HADOOP_YARN = true lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming) @@ -129,7 +136,6 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", @@ -142,8 +148,26 @@ object SparkBuild extends Build { "cc.spray" % "spray-server" % "1.0-M2.1", "cc.spray" %% "spray-json" % "1.1.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" - ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, - unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } + ) ++ ( + if (HADOOP_MAJOR_VERSION == "2") { + if (HADOOP_YARN) { + Seq( + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION + ) + } else { + Seq( + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION + ) + } + } else { + Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION) + }), + unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / + ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") "src/hadoop2-yarn/scala" else "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" ) + } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings def rootSettings = sharedSettings ++ Seq( diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index dd720e2291..f9d84fd3c4 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -153,6 +153,56 @@ + + hadoop2-yarn + + hadoop2-yarn + + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-bagel + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-examples + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-repl + ${project.version} + hadoop2-yarn + runtime + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + deb diff --git a/repl/pom.xml b/repl/pom.xml index a3e4606edc..1f885673f4 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -187,5 +187,76 @@ + + hadoop2-yarn + + hadoop2-yarn + + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-bagel + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-examples + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-streaming + ${project.version} + hadoop2-yarn + runtime + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + org.apache.avro + avro + provided + + + org.apache.avro + avro-ipc + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/streaming/pom.xml b/streaming/pom.xml index ec077e8089..fc2e211a42 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -149,5 +149,42 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + -- cgit v1.2.3 From afee9024430ef79cc0840a5e5788b60c8c53f9d2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 28 Apr 2013 22:26:45 +0530 Subject: Attempt to fix streaming test failures after yarn branch merge --- bagel/src/test/scala/bagel/BagelSuite.scala | 1 + core/src/test/scala/spark/LocalSparkContext.scala | 3 ++- repl/src/test/scala/spark/repl/ReplSuite.scala | 1 + .../main/scala/spark/streaming/Checkpoint.scala | 30 +++++++++++++++++----- .../spark/streaming/util/MasterFailureTest.scala | 8 +++++- .../spark/streaming/BasicOperationsSuite.scala | 1 + .../scala/spark/streaming/CheckpointSuite.scala | 4 ++- .../test/scala/spark/streaming/FailureSuite.scala | 2 ++ .../scala/spark/streaming/InputStreamsSuite.scala | 1 + .../spark/streaming/WindowOperationsSuite.scala | 1 + 10 files changed, 42 insertions(+), 10 deletions(-) (limited to 'streaming') diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 25db395c22..a09c978068 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("halting by voting") { diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index ff00dd05dd..76d5258b02 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -27,6 +27,7 @@ object LocalSparkContext { sc.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ @@ -38,4 +39,4 @@ object LocalSparkContext { } } -} \ No newline at end of file +} diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 43559b96d3..1c64f9b98d 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -32,6 +32,7 @@ class ReplSuite extends FunSuite { interp.sparkContext.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") return out.toString } diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e303e33e5e..7bd104b8d5 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + @volatile private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { + if (stopped) { + logInfo("Already stopped, ignore checkpoint attempt for " + file) + return + } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + stopped = true executor.shutdown() } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index f673e5be15..e7a3f92bc0 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -74,6 +74,7 @@ object MasterFailureTest extends Logging { val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { + logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values) Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) } st.flatMap(_.split(" ")) @@ -159,6 +160,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -205,6 +207,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index cf2ed8b1d4..e7352deb81 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -} \ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index a5fa7ab92d..4529e774e9 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { val batchDuration = Milliseconds(1000) before { + logInfo("BEFORE ...") FileUtils.deleteDirectory(new File(directory)) } after { + logInfo("AFTER ...") FileUtils.deleteDirectory(new File(directory)) } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 67dca2ac31..0acb6db6f2 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( -- cgit v1.2.3 From 7fa6978a1e8822cf377fbb1e8a8d23adc4ebe12e Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 28 Apr 2013 23:08:10 +0530 Subject: Allow CheckpointWriter pending tasks to finish --- streaming/src/main/scala/spark/streaming/Checkpoint.scala | 13 +++++++------ streaming/src/main/scala/spark/streaming/DStreamGraph.scala | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 7bd104b8d5..4bbad908d0 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -42,7 +42,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private val writeFile = new Path(file.getParent, file.getName + ".next") private val bakFile = new Path(file.getParent, file.getName + ".bk") - @volatile private var stopped = false + private var stopped = false val conf = new Configuration() var fs = file.getFileSystem(conf) @@ -57,10 +57,6 @@ class CheckpointWriter(checkpointDir: String) extends Logging { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { - if (stopped) { - logInfo("Already stopped, ignore checkpoint attempt for " + file) - return - } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -99,8 +95,13 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { - stopped = true + synchronized { + if (stopped) return ; + stopped = true + } executor.shutdown() + val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) + logInfo("CheckpointWriter executor terminated ? " + terminated) } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index adb7f3a24d..3b331956f5 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + batchDuration = duration } - batchDuration = duration } def remember(duration: Duration) { -- cgit v1.2.3 From 3a89a76b874298853cf47510ab33e863abf117d7 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 29 Apr 2013 00:04:12 +0530 Subject: Make log message more descriptive to aid in debugging --- streaming/src/main/scala/spark/streaming/Checkpoint.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 4bbad908d0..66e67cbfa1 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -100,8 +100,10 @@ class CheckpointWriter(checkpointDir: String) extends Logging { stopped = true } executor.shutdown() + val startTime = System.currentTimeMillis() val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) - logInfo("CheckpointWriter executor terminated ? " + terminated) + val endTime = System.currentTimeMillis() + logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") } } -- cgit v1.2.3 From 430c531464a5372237c97394f8f4b6ec344394c0 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 29 Apr 2013 00:24:30 +0530 Subject: Remove debug statements --- streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala | 1 - streaming/src/test/scala/spark/streaming/FailureSuite.scala | 2 -- 2 files changed, 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index e7a3f92bc0..426a9b6f71 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -74,7 +74,6 @@ object MasterFailureTest extends Logging { val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { - logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values) Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) } st.flatMap(_.split(" ")) diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 4529e774e9..a5fa7ab92d 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,12 +22,10 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { val batchDuration = Milliseconds(1000) before { - logInfo("BEFORE ...") FileUtils.deleteDirectory(new File(directory)) } after { - logInfo("AFTER ...") FileUtils.deleteDirectory(new File(directory)) } -- cgit v1.2.3 From e7982c798efccd523165d0e347c7912ba14fcdd7 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sat, 18 May 2013 16:11:29 -0700 Subject: Exclude old versions of Netty from Maven-based build --- pom.xml | 6 ++++++ streaming/pom.xml | 6 ++++++ 2 files changed, 12 insertions(+) (limited to 'streaming') diff --git a/pom.xml b/pom.xml index eda18fdd12..6ee64d07c2 100644 --- a/pom.xml +++ b/pom.xml @@ -565,6 +565,12 @@ org.apache.avro avro-ipc 1.7.1.cloudera.2 + + + org.jboss.netty + netty + + diff --git a/streaming/pom.xml b/streaming/pom.xml index 08ff3e2ae1..4dc9a19d51 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -41,6 +41,12 @@ org.apache.flume flume-ng-sdk 1.2.0 + + + org.jboss.netty + netty + + com.github.sgroschupf -- cgit v1.2.3