From e5ca2413352510297092384eda73049ad601fd8a Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Mon, 21 Jan 2013 16:06:58 -0600 Subject: Move JavaAPISuite into spark.streaming. --- streaming/src/test/java/JavaAPISuite.java | 1029 -------------------- streaming/src/test/java/JavaTestUtils.scala | 65 -- .../test/java/spark/streaming/JavaAPISuite.java | 1029 ++++++++++++++++++++ .../test/java/spark/streaming/JavaTestUtils.scala | 65 ++ 4 files changed, 1094 insertions(+), 1094 deletions(-) delete mode 100644 streaming/src/test/java/JavaAPISuite.java delete mode 100644 streaming/src/test/java/JavaTestUtils.scala create mode 100644 streaming/src/test/java/spark/streaming/JavaAPISuite.java create mode 100644 streaming/src/test/java/spark/streaming/JavaTestUtils.scala (limited to 'streaming/src') diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java deleted file mode 100644 index c84e7331c7..0000000000 --- a/streaming/src/test/java/JavaAPISuite.java +++ /dev/null @@ -1,1029 +0,0 @@ -package spark.streaming; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import scala.Tuple2; -import spark.HashPartitioner; -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; -import spark.api.java.function.*; -import spark.storage.StorageLevel; -import spark.streaming.api.java.JavaDStream; -import spark.streaming.api.java.JavaPairDStream; -import spark.streaming.api.java.JavaStreamingContext; -import spark.streaming.JavaTestUtils; -import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; - -import java.io.*; -import java.util.*; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint", new Duration(1000)); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); - } - - @Test - public void testCount() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4), - Arrays.asList(3,4,5), - Arrays.asList(3)); - - List> expected = Arrays.asList( - Arrays.asList(4L), - Arrays.asList(3L), - Arrays.asList(1L)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); - JavaTestUtils.attachTestOutputStream(count); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMap() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List> expected = Arrays.asList( - Arrays.asList(5,5), - Arrays.asList(9,4)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaTestUtils.attachTestOutputStream(letterCount); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6,1,2,3), - Arrays.asList(7,8,9,4,5,6), - Arrays.asList(7,8,9)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testWindowWithSlideDuration() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), - Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 8, 4); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testTumble() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(7,8,9,10,11,12), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.tumble(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 6, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function() { - @Override - public Boolean call(String s) throws Exception { - return s.contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testGlom() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List>> expected = Arrays.asList( - Arrays.asList(Arrays.asList("giants", "dodgers")), - Arrays.asList(Arrays.asList("yankees", "red socks"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream glommed = stream.glom(); - JavaTestUtils.attachTestOutputStream(glommed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOCKS")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { - @Override - public Iterable call(Iterator in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - private class IntegerSum extends Function2 { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; - } - } - - private class IntegerDifference extends Function2 { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 - i2; - } - } - - @Test - public void testReduce() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); - JavaTestUtils.attachTestOutputStream(reduced); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testQueueStream() { - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); - JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); - - LinkedList> rdds = Lists.newLinkedList(); - rdds.add(rdd1); - rdds.add(rdd2); - rdds.add(rdd3); - - JavaDStream stream = ssc.queueStream(rdds); - JavaTestUtils.attachTestOutputStream(stream); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(3,4,5), - Arrays.asList(6,7,8), - Arrays.asList(9,10,11)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) throws Exception { - return in.map(new Function() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); - JavaTestUtils.attachTestOutputStream(transformed); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List> expected = Arrays.asList( - Arrays.asList("g","o","g","i","a","n","t","s"), - Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), - Arrays.asList("a","t","h","l","e","t","i","c","s")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(x.split("(?!^)")); - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testPairFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2(6, "g"), - new Tuple2(6, "i"), - new Tuple2(6, "a"), - new Tuple2(6, "n"), - new Tuple2(6, "t"), - new Tuple2(6, "s")), - Arrays.asList( - new Tuple2(7, "d"), - new Tuple2(7, "o"), - new Tuple2(7, "d"), - new Tuple2(7, "g"), - new Tuple2(7, "e"), - new Tuple2(7, "r"), - new Tuple2(7, "s")), - Arrays.asList( - new Tuple2(9, "a"), - new Tuple2(9, "t"), - new Tuple2(9, "h"), - new Tuple2(9, "l"), - new Tuple2(9, "e"), - new Tuple2(9, "t"), - new Tuple2(9, "i"), - new Tuple2(9, "c"), - new Tuple2(9, "s"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { - @Override - public Iterable> call(String in) throws Exception { - List> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2(in.length(), letter)); - } - return out; - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUnion() { - List> inputData1 = Arrays.asList( - Arrays.asList(1,1), - Arrays.asList(2,2), - Arrays.asList(3,3)); - - List> inputData2 = Arrays.asList( - Arrays.asList(4,4), - Arrays.asList(5,5), - Arrays.asList(6,6)); - - List> expected = Arrays.asList( - Arrays.asList(1,1,4,4), - Arrays.asList(2,2,5,5), - Arrays.asList(3,3,6,6)); - - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - - JavaDStream unioned = stream1.union(stream2); - JavaTestUtils.attachTestOutputStream(unioned); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static void assertOrderInvariantEquals( - List> expected, List> actual) { - for (List list: expected) { - Collections.sort(list); - } - for (List list: actual) { - Collections.sort(list); - } - Assert.assertEquals(expected, actual); - } - - - // PairDStream Functions - @Test - public void testPairFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red socks")); - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("giants", 6)), - Arrays.asList(new Tuple2("yankees", 7))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = stream.map( - new PairFunction() { - @Override - public Tuple2 call(String in) throws Exception { - return new Tuple2(in, in.length()); - } - }); - - JavaPairDStream filtered = pairStream.filter( - new Function, Boolean>() { - @Override - public Boolean call(Tuple2 in) throws Exception { - return in._1().contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("california", "giants"), - new Tuple2("new york", "yankees"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("california", "ducks"), - new Tuple2("new york", "rangers"), - new Tuple2("new york", "islanders"))); - - List>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2("california", 1), - new Tuple2("california", 3), - new Tuple2("new york", 4), - new Tuple2("new york", 1)), - Arrays.asList( - new Tuple2("california", 5), - new Tuple2("california", 5), - new Tuple2("new york", 3), - new Tuple2("new york", 1))); - - @Test - public void testPairGroupByKey() { - List>> inputData = stringStringKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", Arrays.asList("dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", Arrays.asList("sharks", "ducks")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> grouped = pairStream.groupByKey(); - JavaTestUtils.attachTestOutputStream(grouped); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList( - new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); - - JavaTestUtils.attachTestOutputStream(reduced); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList( - new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream combined = pairStream.combineByKey( - new Function() { - @Override - public Integer call(Integer i) throws Exception { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCountByKey() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L)), - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream counted = pairStream.countByKey(); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testGroupByKeyAndWindow() { - List>> inputData = stringStringKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> groupWindowed = - pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(groupWindowed); - List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>(){ - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v: values) { - out = out + v; - } - return Optional.of(out); - } - }); - JavaTestUtils.attachTestOutputStream(updated); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", 4), - new Tuple2("new york", 5)), - Arrays.asList(new Tuple2("california", 14), - new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 10), - new Tuple2("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCountByKeyAndWindow() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L)), - Arrays.asList( - new Tuple2("california", 4L), - new Tuple2("new york", 4L)), - Arrays.asList( - new Tuple2("california", 2L), - new Tuple2("new york", 2L))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream counted = - pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", "DODGERS"), - new Tuple2("california", "GIANTS"), - new Tuple2("new york", "YANKEES"), - new Tuple2("new york", "METS")), - Arrays.asList(new Tuple2("california", "SHARKS"), - new Tuple2("california", "DUCKS"), - new Tuple2("new york", "RANGERS"), - new Tuple2("new york", "ISLANDERS"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream mapped = pairStream.mapValues(new Function() { - @Override - public String call(String s) throws Exception { - return s.toUpperCase(); - } - }); - - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers1"), - new Tuple2("california", "dodgers2"), - new Tuple2("california", "giants1"), - new Tuple2("california", "giants2"), - new Tuple2("new york", "yankees1"), - new Tuple2("new york", "yankees2"), - new Tuple2("new york", "mets1"), - new Tuple2("new york", "mets2")), - Arrays.asList(new Tuple2("california", "sharks1"), - new Tuple2("california", "sharks2"), - new Tuple2("california", "ducks1"), - new Tuple2("california", "ducks2"), - new Tuple2("new york", "rangers1"), - new Tuple2("new york", "rangers2"), - new Tuple2("new york", "islanders1"), - new Tuple2("new york", "islanders2"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - - JavaPairDStream flatMapped = pairStream.flatMapValues( - new Function>() { - @Override - public Iterable call(String in) { - List out = new ArrayList(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCoGroup() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); - - - List, List>>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2, List>>("california", - new Tuple2, List>(Arrays.asList("dodgers"), Arrays.asList("giants"))), - new Tuple2, List>>("new york", - new Tuple2, List>(Arrays.asList("yankees"), Arrays.asList("mets")))), - Arrays.asList( - new Tuple2, List>>("california", - new Tuple2, List>(Arrays.asList("sharks"), Arrays.asList("ducks"))), - new Tuple2, List>>("new york", - new Tuple2, List>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); - JavaTestUtils.attachTestOutputStream(grouped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testJoin() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); - - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", - new Tuple2("sharks", "ducks")), - new Tuple2>("new york", - new Tuple2("rangers", "islanders")))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream> joined = pairStream1.join(pairStream2); - JavaTestUtils.attachTestOutputStream(joined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCheckpointMasterRecovery() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expectedInitial = Arrays.asList( - Arrays.asList(4,2)); - List> expectedFinal = Arrays.asList( - Arrays.asList(1,4), - Arrays.asList(8,7)); - - - File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); - - assertOrderInvariantEquals(expectedInitial, initialResult); - Thread.sleep(1000); - - ssc.stop(); - ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - ssc.start(); - List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); - assertOrderInvariantEquals(expectedFinal, finalResult); - } - - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD - @Test - public void testCheckpointofIndividualStream() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expected = Arrays.asList( - Arrays.asList(4,2), - Arrays.asList(1,4), - Arrays.asList(8,7)); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - - letterCount.checkpoint(new Duration(1000)); - - List> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result1); - } - */ - - // Input stream tests. These mostly just test that we can instantiate a given InputStream with - // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the - // InputStream functionality is deferred to the existing Scala tests. - @Test - public void testKafkaStream() { - HashMap topics = Maps.newHashMap(); - HashMap offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test - public void testNetworkTextStream() { - JavaDStream test = ssc.networkTextStream("localhost", 12345); - } - - @Test - public void testNetworkString() { - class Converter extends Function> { - public Iterable call(InputStream in) { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - List out = new ArrayList(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } - return out; - } - } - - JavaDStream test = ssc.networkStream( - "localhost", - 12345, - new Converter(), - StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); - } - - @Test - public void testRawNetworkStream() { - JavaDStream test = ssc.rawNetworkStream("localhost", 12345); - } - - @Test - public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345); - } - - @Test - public void testFileStream() { - JavaPairDStream foo = - ssc.fileStream("/tmp/foo"); - } -} diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/JavaTestUtils.scala deleted file mode 100644 index 56349837e5..0000000000 --- a/streaming/src/test/java/JavaTestUtils.scala +++ /dev/null @@ -1,65 +0,0 @@ -package spark.streaming - -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import java.util.{List => JList} -import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} -import spark.streaming._ -import java.util.ArrayList -import collection.JavaConversions._ - -/** Exposes streaming test functionality in a Java-friendly way. */ -trait JavaTestBase extends TestSuiteBase { - - /** - * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. - * The stream will be derived from the supplied lists of Java objects. - **/ - def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { - val seqData = data.map(Seq(_:_*)) - - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) - ssc.ssc.registerInputStream(dstream) - new JavaDStream[T](dstream) - } - - /** - * Attach a provided stream to it's associated StreamingContext as a - * [[spark.streaming.TestOutputStream]]. - **/ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( - dstream: JavaDStreamLike[T, This]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) - dstream.dstream.ssc.registerOutputStream(ostream) - } - - /** - * Process all registered streams for a numBatches batches, failing if - * numExpectedOutput RDD's are not generated. Generated RDD's are collected - * and returned, represented as a list for each batch interval. - */ - def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] - val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) - val out = new ArrayList[JList[V]]() - res.map(entry => out.append(new ArrayList[V](entry))) - out - } -} - -object JavaTestUtils extends JavaTestBase { - -} - -object JavaCheckpointTestUtils extends JavaTestBase { - override def actuallyWait = true -} \ No newline at end of file diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000..c84e7331c7 --- /dev/null +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -0,0 +1,1029 @@ +package spark.streaming; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import spark.HashPartitioner; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.*; +import spark.storage.StorageLevel; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; +import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; + +import java.io.*; +import java.util.*; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext ssc; + + @Before + public void setUp() { + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint", new Duration(1000)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void testCount() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), + Arrays.asList(3,4,5), + Arrays.asList(3)); + + List> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMap() { + List> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List> expected = Arrays.asList( + Arrays.asList(5,5), + Arrays.asList(9,4)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindow() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6,1,2,3), + Arrays.asList(7,8,9,4,5,6), + Arrays.asList(7,8,9)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 4, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindowWithSlideDuration() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), + Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 8, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testTumble() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(7,8,9,10,11,12), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream windowed = stream.tumble(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List> result = JavaTestUtils.runStreams(ssc, 6, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream filtered = stream.filter(new Function() { + @Override + public Boolean call(String s) throws Exception { + return s.contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testGlom() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List>> expected = Arrays.asList( + Arrays.asList(Arrays.asList("giants", "dodgers")), + Arrays.asList(Arrays.asList("yankees", "red socks"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream glommed = stream.glom(); + JavaTestUtils.attachTestOutputStream(glommed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOCKS")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { + @Override + public Iterable call(Iterator in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); + JavaTestUtils.attachTestOutputStream(mapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + private class IntegerSum extends Function2 { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2 { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List> result = JavaTestUtils.runStreams(ssc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testQueueStream() { + List> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); + JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); + JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); + + LinkedList> rdds = Lists.newLinkedList(); + rdds.add(rdd1); + rdds.add(rdd2); + rdds.add(rdd3); + + JavaDStream stream = ssc.queueStream(rdds); + JavaTestUtils.attachTestOutputStream(stream); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream transformed = stream.transform(new Function, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD in) throws Exception { + return in.map(new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFlatMap() { + List> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List> expected = Arrays.asList( + Arrays.asList("g","o","g","i","a","n","t","s"), + Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), + Arrays.asList("a","t","h","l","e","t","i","c","s")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(x.split("(?!^)")); + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testPairFlatMap() { + List> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(6, "g"), + new Tuple2(6, "i"), + new Tuple2(6, "a"), + new Tuple2(6, "n"), + new Tuple2(6, "t"), + new Tuple2(6, "s")), + Arrays.asList( + new Tuple2(7, "d"), + new Tuple2(7, "o"), + new Tuple2(7, "d"), + new Tuple2(7, "g"), + new Tuple2(7, "e"), + new Tuple2(7, "r"), + new Tuple2(7, "s")), + Arrays.asList( + new Tuple2(9, "a"), + new Tuple2(9, "t"), + new Tuple2(9, "h"), + new Tuple2(9, "l"), + new Tuple2(9, "e"), + new Tuple2(9, "t"), + new Tuple2(9, "i"), + new Tuple2(9, "c"), + new Tuple2(9, "s"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { + @Override + public Iterable> call(String in) throws Exception { + List> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2(in.length(), letter)); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUnion() { + List> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static void assertOrderInvariantEquals( + List> expected, List> actual) { + for (List list: expected) { + Collections.sort(list); + } + for (List list: actual) { + Collections.sort(list); + } + Assert.assertEquals(expected, actual); + } + + + // PairDStream Functions + @Test + public void testPairFilter() { + List> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("giants", 6)), + Arrays.asList(new Tuple2("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = stream.map( + new PairFunction() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2(in, in.length()); + } + }); + + JavaPairDStream filtered = pairStream.filter( + new Function, Boolean>() { + @Override + public Boolean call(Tuple2 in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("california", "giants"), + new Tuple2("new york", "yankees"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("california", "ducks"), + new Tuple2("new york", "rangers"), + new Tuple2("new york", "islanders"))); + + List>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2("california", 1), + new Tuple2("california", 3), + new Tuple2("new york", 4), + new Tuple2("new york", 1)), + Arrays.asList( + new Tuple2("california", 5), + new Tuple2("california", 5), + new Tuple2("new york", 3), + new Tuple2("new york", 1))); + + @Test + public void testPairGroupByKey() { + List>> inputData = stringStringKVStream; + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", Arrays.asList("dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", Arrays.asList("sharks", "ducks")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList( + new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList( + new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream combined = pairStream.combineByKey( + new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i; + } + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKey() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>>> expected = Arrays.asList( + Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), + new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream> groupWindowed = + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream updated = pairStream.updateStateByKey( + new Function2, Optional, Optional>(){ + @Override + public Optional call(List values, Optional state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }); + JavaTestUtils.attachTestOutputStream(updated); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", 4), + new Tuple2("new york", 5)), + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9)), + Arrays.asList(new Tuple2("california", 10), + new Tuple2("new york", 4))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L)), + Arrays.asList( + new Tuple2("california", 4L), + new Tuple2("new york", 4L)), + Arrays.asList( + new Tuple2("california", 2L), + new Tuple2("new york", 2L))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream counted = + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "DODGERS"), + new Tuple2("california", "GIANTS"), + new Tuple2("new york", "YANKEES"), + new Tuple2("new york", "METS")), + Arrays.asList(new Tuple2("california", "SHARKS"), + new Tuple2("california", "DUCKS"), + new Tuple2("new york", "RANGERS"), + new Tuple2("new york", "ISLANDERS"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream mapped = pairStream.mapValues(new Function() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List>> inputData = stringStringKVStream; + + List>> expected = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers1"), + new Tuple2("california", "dodgers2"), + new Tuple2("california", "giants1"), + new Tuple2("california", "giants2"), + new Tuple2("new york", "yankees1"), + new Tuple2("new york", "yankees2"), + new Tuple2("new york", "mets1"), + new Tuple2("new york", "mets2")), + Arrays.asList(new Tuple2("california", "sharks1"), + new Tuple2("california", "sharks2"), + new Tuple2("california", "ducks1"), + new Tuple2("california", "ducks2"), + new Tuple2("new york", "rangers1"), + new Tuple2("new york", "rangers2"), + new Tuple2("new york", "islanders1"), + new Tuple2("new york", "islanders2"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream flatMapped = pairStream.flatMapValues( + new Function>() { + @Override + public Iterable call(String in) { + List out = new ArrayList(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List, List>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2, List>>("california", + new Tuple2, List>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2, List>>("new york", + new Tuple2, List>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); + + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); + ssc.start(); + List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap topics = Maps.newHashMap(); + HashMap offsets = Maps.newHashMap(); + JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = ssc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function> { + public Iterable call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List out = new ArrayList(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = ssc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = ssc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = ssc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream foo = + ssc.fileStream("/tmp/foo"); + } +} diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala new file mode 100644 index 0000000000..56349837e5 --- /dev/null +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -0,0 +1,65 @@ +package spark.streaming + +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.{List => JList} +import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import spark.streaming._ +import java.util.ArrayList +import collection.JavaConversions._ + +/** Exposes streaming test functionality in a Java-friendly way. */ +trait JavaTestBase extends TestSuiteBase { + + /** + * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * The stream will be derived from the supplied lists of Java objects. + **/ + def attachTestInputStream[T]( + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { + val seqData = data.map(Seq(_:_*)) + + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) + ssc.ssc.registerInputStream(dstream) + new JavaDStream[T](dstream) + } + + /** + * Attach a provided stream to it's associated StreamingContext as a + * [[spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( + dstream: JavaDStreamLike[T, This]) = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val ostream = new TestOutputStream(dstream.dstream, + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + dstream.dstream.ssc.registerOutputStream(ostream) + } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + */ + def runStreams[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[V]]() + res.map(entry => out.append(new ArrayList[V](entry))) + out + } +} + +object JavaTestUtils extends JavaTestBase { + +} + +object JavaCheckpointTestUtils extends JavaTestBase { + override def actuallyWait = true +} \ No newline at end of file -- cgit v1.2.3 From 551a47a620c7dc207e3530e54d794a3c3aa8e45e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 21 Jan 2013 23:31:00 -0800 Subject: Refactor daemon thread pool creation. --- .../src/main/scala/spark/DaemonThreadFactory.scala | 18 ------------ core/src/main/scala/spark/Utils.scala | 33 +++++----------------- .../scala/spark/network/ConnectionManager.scala | 5 ++-- .../spark/scheduler/local/LocalScheduler.scala | 2 +- .../spark/streaming/dstream/RawInputDStream.scala | 5 ++-- 5 files changed, 13 insertions(+), 50 deletions(-) delete mode 100644 core/src/main/scala/spark/DaemonThreadFactory.scala (limited to 'streaming/src') diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala deleted file mode 100644 index 56e59adeb7..0000000000 --- a/core/src/main/scala/spark/DaemonThreadFactory.scala +++ /dev/null @@ -1,18 +0,0 @@ -package spark - -import java.util.concurrent.ThreadFactory - -/** - * A ThreadFactory that creates daemon threads - */ -private object DaemonThreadFactory extends ThreadFactory { - override def newThread(r: Runnable): Thread = new DaemonThread(r) -} - -private class DaemonThread(r: Runnable = null) extends Thread { - override def run() { - if (r != null) { - r.run() - } - } -} \ No newline at end of file diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 692a3f4050..9b8636f6c8 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -10,6 +10,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source import com.google.common.io.Files +import com.google.common.util.concurrent.ThreadFactoryBuilder /** * Various utility methods used by Spark. @@ -287,29 +288,14 @@ private object Utils extends Logging { customHostname.getOrElse(InetAddress.getLocalHost.getHostName) } - /** - * Returns a standard ThreadFactory except all threads are daemons. - */ - private def newDaemonThreadFactory: ThreadFactory = { - new ThreadFactory { - def newThread(r: Runnable): Thread = { - var t = Executors.defaultThreadFactory.newThread (r) - t.setDaemon (true) - return t - } - } - } + private[spark] val daemonThreadFactory: ThreadFactory = + new ThreadFactoryBuilder().setDaemon(true).build() /** * Wrapper over newCachedThreadPool. */ - def newDaemonCachedThreadPool(): ThreadPoolExecutor = { - var threadPool = Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor] - - threadPool.setThreadFactory (newDaemonThreadFactory) - - return threadPool - } + def newDaemonCachedThreadPool(): ThreadPoolExecutor = + Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] /** * Return the string to tell how long has passed in seconds. The passing parameter should be in @@ -322,13 +308,8 @@ private object Utils extends Logging { /** * Wrapper over newFixedThreadPool. */ - def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = { - var threadPool = Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor] - - threadPool.setThreadFactory(newDaemonThreadFactory) - - return threadPool - } + def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = + Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] /** * Delete a file or directory and its contents recursively. diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 36c01ad629..2ecd14f536 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -52,9 +52,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val sendMessageRequests = new Queue[(Message, SendingConnection)] - implicit val futureExecContext = ExecutionContext.fromExecutor( - Executors.newCachedThreadPool(DaemonThreadFactory)) - + implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool()) + var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null serverChannel.configureBlocking(false) diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index dff550036d..87f8474ea0 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -20,7 +20,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon with Logging { var attemptId = new AtomicInteger(0) - var threadPool = Executors.newFixedThreadPool(threads, DaemonThreadFactory) + var threadPool = Utils.newDaemonFixedThreadPool(threads) val env = SparkEnv.get var listener: TaskSchedulerListener = null diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 290fab1ce0..04e6b69b7b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.{DaemonThread, Logging} +import spark.Logging import spark.storage.StorageLevel import spark.streaming.StreamingContext @@ -48,7 +48,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) val queue = new ArrayBlockingQueue[ByteBuffer](2) - blockPushingThread = new DaemonThread { + blockPushingThread = new Thread { + setDaemon(true) override def run() { var nextBlockNumber = 0 while (true) { -- cgit v1.2.3 From 364cdb679cf2b0d5e6ed7ab89628f15594d7947f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 22 Jan 2013 00:43:31 -0800 Subject: Refactored DStreamCheckpointData. --- .../src/main/scala/spark/streaming/DStream.scala | 58 +++------------ .../spark/streaming/DStreamCheckpointData.scala | 84 ++++++++++++++++++++++ .../streaming/dstream/KafkaInputDStream.scala | 9 --- .../scala/spark/streaming/CheckpointSuite.scala | 12 ++-- 4 files changed, 99 insertions(+), 64 deletions(-) create mode 100644 streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index b11ef443dc..3c1861a840 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration /** @@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] ( // Checkpoint details protected[streaming] val mustCheckpoint = false protected[streaming] var checkpointDuration: Duration = null - protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) + protected[streaming] val checkpointData = new DStreamCheckpointData(this) // Reference to whole DStream graph protected[streaming] var graph: DStreamGraph = null @@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] ( // Duration for which the DStream requires its parent DStream to remember each RDD created protected[streaming] def parentRememberDuration = rememberDuration - /** Returns the StreamingContext associated with this DStream */ + /** Return the StreamingContext associated with this DStream */ def context() = ssc - /** Persists the RDDs of this DStream with the given storage level */ + /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { if (this.isInitialized) { throw new UnsupportedOperationException( @@ -342,40 +342,10 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def updateCheckpointData(currentTime: Time) { logInfo("Updating checkpoint data for time " + currentTime) - - // Get the checkpointed RDDs from the generated RDDs - val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) - .map(x => (x._1, x._2.getCheckpointFile.get)) - - // Make a copy of the existing checkpoint data (checkpointed RDDs) - val oldRdds = checkpointData.rdds.clone() - - // If the new checkpoint data has checkpoints then replace existing with the new one - if (newRdds.size > 0) { - checkpointData.rdds.clear() - checkpointData.rdds ++= newRdds - } - - // Make parent DStreams update their checkpoint data + checkpointData.update() dependencies.foreach(_.updateCheckpointData(currentTime)) - - // TODO: remove this, this is just for debugging - newRdds.foreach { - case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } - } - - if (newRdds.size > 0) { - (oldRdds -- newRdds.keySet).foreach { - case (time, data) => { - val path = new Path(data.toString) - val fs = path.getFileSystem(new Configuration()) - fs.delete(path, true) - logInfo("Deleted checkpoint file '" + path + "' for time " + time) - } - } - } - logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, " - + "[" + checkpointData.rdds.mkString(",") + "]") + checkpointData.cleanup() + logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) } /** @@ -386,14 +356,8 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs") - checkpointData.rdds.foreach { - case(time, data) => { - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.checkpointFile[T](data.toString) - generatedRDDs += ((time, rdd)) - } - } + logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs") + checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) logInfo("Restored checkpoint data") } @@ -651,7 +615,3 @@ abstract class DStream[T: ClassManifest] ( ssc.registerOutputStream(this) } } - -private[streaming] -case class DStreamCheckpointData(rdds: HashMap[Time, Any]) - diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala new file mode 100644 index 0000000000..abf903293f --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -0,0 +1,84 @@ +package spark.streaming + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.conf.Configuration +import collection.mutable.HashMap +import spark.Logging + + + +private[streaming] +class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) + extends Serializable with Logging { + private[streaming] val checkpointFiles = new HashMap[Time, String]() + @transient private lazy val fileSystem = + new Path(dstream.context.checkpointDir).getFileSystem(new Configuration()) + @transient private var lastCheckpointFiles: HashMap[Time, String] = null + + /** + * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to + * which the generate RDDs of the DStream has been saved. + */ + def update() { + + // Get the checkpointed RDDs from the generated RDDs + val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + + // Make a copy of the existing checkpoint data (checkpointed RDDs) + lastCheckpointFiles = checkpointFiles.clone() + + // If the new checkpoint data has checkpoints then replace existing with the new one + if (newCheckpointFiles.size > 0) { + checkpointFiles.clear() + checkpointFiles ++= newCheckpointFiles + } + + // TODO: remove this, this is just for debugging + newCheckpointFiles.foreach { + case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } + } + } + + /** + * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files. + */ + def cleanup() { + // If there is at least on checkpoint file in the current checkpoint files, + // then delete the old checkpoint files. + if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { + (lastCheckpointFiles -- checkpointFiles.keySet).foreach { + case (time, file) => { + try { + val path = new Path(file) + fileSystem.delete(path, true) + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } + } + } + } + } + + /** + * Restore the checkpoint data. Default implementation restores the RDDs from their + * checkpoint files. + */ + def restore() { + // Create RDDs from the checkpoint data + checkpointFiles.foreach { + case(time, file) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file))) + } + } + } + + override def toString() = { + "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]" + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 2b4740bdf7..760d9b5cf3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,15 +19,6 @@ import scala.collection.JavaConversions._ // Key for a specific Kafka Partition: (broker, topic, group, part) case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -private[streaming] -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -private[streaming] -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** * Input stream that pulls messages from a Kafka Broker. diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index d2f32c189b..58da4ee539 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // then check whether some RDD has been checkpointed or not ssc.start() runStreamsWithRealDelay(ssc, firstNumBatches) - logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.rdds.foreach { + logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) + val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // is present in the checkpoint data or not ssc.start() runStreamsWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.rdds.foreach { + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), -- cgit v1.2.3 From fad2b82fc8fb49f2171af10cf7e408d8b8dd7349 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 22 Jan 2013 18:10:00 -0800 Subject: Added support for saving input files of FileInputDStream to graph checkpoints. Modified 'file input stream with checkpoint' testcase to test recovery of pre-master-failure input files. --- .../src/main/scala/spark/streaming/DStream.scala | 29 ++++--- .../spark/streaming/DStreamCheckpointData.scala | 27 ++++-- .../main/scala/spark/streaming/DStreamGraph.scala | 2 +- .../scala/spark/streaming/StreamingContext.scala | 7 +- .../spark/streaming/dstream/FileInputDStream.scala | 96 +++++++++++++++++----- .../scala/spark/streaming/InputStreamsSuite.scala | 64 ++++++++++----- 6 files changed, 159 insertions(+), 66 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3c1861a840..07ecb018ee 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -86,7 +86,7 @@ abstract class DStream[T: ClassManifest] ( protected[streaming] def parentRememberDuration = rememberDuration /** Return the StreamingContext associated with this DStream */ - def context() = ssc + def context = ssc /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { @@ -159,7 +159,7 @@ abstract class DStream[T: ClassManifest] ( ) assert( - checkpointDuration == null || ssc.sc.checkpointDir.isDefined, + checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + " or SparkContext.checkpoint() to set the checkpoint directory." ) @@ -298,8 +298,8 @@ abstract class DStream[T: ClassManifest] ( getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { - val emptyFunc = { (iterator: Iterator[T]) => {} } - ssc.sc.runJob(rdd, emptyFunc) + val emptyFunc = { (iterator: Iterator[T]) => {} } + context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } @@ -310,10 +310,9 @@ abstract class DStream[T: ClassManifest] ( /** * Dereference RDDs that are older than rememberDuration. */ - protected[streaming] def forgetOldRDDs(time: Time) { - val keys = generatedRDDs.keys + protected[streaming] def forgetOldMetadata(time: Time) { var numForgotten = 0 - keys.foreach(t => { + generatedRDDs.keys.foreach(t => { if (t <= (time - rememberDuration)) { generatedRDDs.remove(t) numForgotten += 1 @@ -321,7 +320,7 @@ abstract class DStream[T: ClassManifest] ( } }) logInfo("Forgot " + numForgotten + " RDDs from " + this) - dependencies.foreach(_.forgetOldRDDs(time)) + dependencies.foreach(_.forgetOldMetadata(time)) } /* Adds metadata to the Stream while it is running. @@ -356,7 +355,7 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs") + logInfo("Restoring checkpoint data") checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) logInfo("Restored checkpoint data") @@ -397,7 +396,7 @@ abstract class DStream[T: ClassManifest] ( /** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { - new MappedDStream(this, ssc.sc.clean(mapFunc)) + new MappedDStream(this, context.sparkContext.clean(mapFunc)) } /** @@ -405,7 +404,7 @@ abstract class DStream[T: ClassManifest] ( * and then flattening the results */ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { - new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) + new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } /** Return a new DStream containing only the elements that satisfy a predicate. */ @@ -427,7 +426,7 @@ abstract class DStream[T: ClassManifest] ( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false ): DStream[U] = { - new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning) + new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) } /** @@ -456,7 +455,7 @@ abstract class DStream[T: ClassManifest] ( * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { - val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) ssc.registerOutputStream(newStream) newStream } @@ -474,7 +473,7 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - new TransformedDStream(this, ssc.sc.clean(transformFunc)) + new TransformedDStream(this, context.sparkContext.clean(transformFunc)) } /** @@ -491,7 +490,7 @@ abstract class DStream[T: ClassManifest] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) ssc.registerOutputStream(newStream) } diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala index abf903293f..a375980b84 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -11,14 +11,17 @@ import spark.Logging private[streaming] class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) extends Serializable with Logging { - private[streaming] val checkpointFiles = new HashMap[Time, String]() - @transient private lazy val fileSystem = - new Path(dstream.context.checkpointDir).getFileSystem(new Configuration()) + protected val data = new HashMap[Time, AnyRef]() + + @transient private var fileSystem : FileSystem = null @transient private var lastCheckpointFiles: HashMap[Time, String] = null + protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]] + /** - * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to - * which the generate RDDs of the DStream has been saved. + * Updates the checkpoint data of the DStream. This gets called every time + * the graph checkpoint is initiated. Default implementation records the + * checkpoint files to which the generate RDDs of the DStream has been saved. */ def update() { @@ -42,7 +45,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } /** - * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files. + * Cleanup old checkpoint data. This gets called every time the graph + * checkpoint is initiated, but after `update` is called. Default + * implementation, cleans up old checkpoint files. */ def cleanup() { // If there is at least on checkpoint file in the current checkpoint files, @@ -52,6 +57,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) case (time, file) => { try { val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(new Configuration()) + } fileSystem.delete(path, true) logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { @@ -64,15 +72,16 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } /** - * Restore the checkpoint data. Default implementation restores the RDDs from their - * checkpoint files. + * Restore the checkpoint data. This gets called once when the DStream graph + * (along with its DStreams) are being restored from a graph checkpoint file. + * Default implementation restores the RDDs from their checkpoint files. */ def restore() { // Create RDDs from the checkpoint data checkpointFiles.foreach { case(time, file) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") - dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file))) + dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bc4a40d7bc..d5a5496839 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -87,7 +87,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def forgetOldRDDs(time: Time) { this.synchronized { - outputStreams.foreach(_.forgetOldRDDs(time)) + outputStreams.foreach(_.forgetOldMetadata(time)) } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..2cf00e3baa 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -61,7 +61,7 @@ class StreamingContext private ( protected[streaming] val isCheckpointPresent = (cp_ != null) - val sc: SparkContext = { + protected[streaming] val sc: SparkContext = { if (isCheckpointPresent) { new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars) } else { @@ -100,6 +100,11 @@ class StreamingContext private ( protected[streaming] var receiverJobThread: Thread = null protected[streaming] var scheduler: Scheduler = null + /** + * Returns the associated Spark context + */ + def sparkContext = sc + /** * Sets each DStreams in this context to remember RDDs it generated in the last given duration. * DStreams remember RDDs only for a limited duration of time and releases them for garbage diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index 1e6ad84b44..c6ffb252ce 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -2,13 +2,14 @@ package spark.streaming.dstream import spark.RDD import spark.rdd.UnionRDD -import spark.streaming.{StreamingContext, Time} +import spark.streaming.{DStreamCheckpointData, StreamingContext, Time} import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import scala.collection.mutable.HashSet +import scala.collection.mutable.{HashSet, HashMap} +import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @@ -18,21 +19,14 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { - @transient private var path_ : Path = null - @transient private var fs_ : FileSystem = null - - var lastModTime = 0L - val lastModTimeFiles = new HashSet[String]() + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData - def path(): Path = { - if (path_ == null) path_ = new Path(directory) - path_ - } + private val lastModTimeFiles = new HashSet[String]() + private var lastModTime = 0L - def fs(): FileSystem = { - if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) - fs_ - } + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + @transient private var files = new HashMap[Time, Array[String]] override def start() { if (newFilesOnly) { @@ -79,8 +73,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } - val newFiles = fs.listStatus(path, newFilter) - logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + logInfo("New files: " + newFiles.mkString(", ")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time if (lastModTime != newFilter.latestModTime) { @@ -89,9 +83,70 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } lastModTimeFiles ++= newFilter.latestModTimeFiles } - val newRDD = new UnionRDD(ssc.sc, newFiles.map( - file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) - Some(newRDD) + files += ((validTime, newFiles)) + Some(filesToRDD(newFiles)) + } + + /** Forget the old time-to-files mappings along with old RDDs */ + protected[streaming] override def forgetOldMetadata(time: Time) { + super.forgetOldMetadata(time) + val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration)) + files --= filesToBeRemoved.keys + logInfo("Forgot " + filesToBeRemoved.size + " files from " + this) + } + + /** Generate one RDD from an array of files */ + protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + new UnionRDD( + context.sparkContext, + files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + ) + } + + private def path: Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + private def fs: FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[(K,V)]] () + files = new HashMap[Time, Array[String]] + } + + /** + * A custom version of the DStreamCheckpointData that stores names of + * Hadoop files as checkpoint data. + */ + private[streaming] + class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { + + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + + override def update() { + hadoopFiles.clear() + hadoopFiles ++= files + } + + override def cleanup() { } + + override def restore() { + hadoopFiles.foreach { + case (time, files) => { + logInfo("Restoring Hadoop RDD for time " + time + " from files " + + files.mkString("[", ",", "]") ) + files + generatedRDDs += ((time, filesToRDD(files))) + } + } + } } } @@ -100,3 +155,4 @@ object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } + diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..4f6204f205 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -214,10 +214,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { //Thread.sleep(100) } val startTime = System.currentTimeMillis() - /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) - Thread.sleep(100) - }*/ Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") @@ -226,11 +222,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) - logInfo("output") + logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") + logInfo("expected output, size = " + expectedOutput.size) expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") @@ -256,8 +250,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) - val filestream = ssc.textFileStream(testDir.toString) - var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + val fileStream = ssc.textFileStream(testDir.toString) + val outputBuffer = new ArrayBuffer[Seq[Int]] + // Reduced over a large window to ensure that recovery from master failure + // requires reprocessing of all the files seen before the failure + val reducedStream = fileStream.map(_.toInt) + .reduceByWindow(_ + _, batchDuration * 30, batchDuration) + var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -266,31 +265,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + // wait to make sure that the file is written such that it gets shown in the file listings + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + // wait to make sure that FileInputDStream picks up this file only and not any other file + Thread.sleep(500) } - Thread.sleep(500) logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(1000) + } + // Restart stream computation from checkpoint and create more files to see whether // they are being processed logInfo("*********** RESTARTING ************") ssc = new StreamingContext(checkpointDir) ssc.start() clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(500) - for (i <- Seq(4, 5, 6)) { + for (i <- Seq(7, 8, 9)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + Thread.sleep(1000) + assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() + + // Append the new output to the old buffer + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + outputBuffer ++= outputStream.output + + // Verify whether data received by Spark Streaming was as expected + val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } } } -- cgit v1.2.3 From 666ce431aa03239d580a8c78b3a2f34a851eb413 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 23 Jan 2013 03:15:36 -0800 Subject: Added support for rescheduling unprocessed batches on master failure. --- .../main/scala/spark/streaming/Checkpoint.scala | 3 ++- .../main/scala/spark/streaming/JobManager.scala | 30 +++++++++++++++++++++- .../src/main/scala/spark/streaming/Scheduler.scala | 5 +++- .../scala/spark/streaming/StreamingContext.scala | 4 +-- .../scala/spark/streaming/InputStreamsSuite.scala | 23 ++++++++++++----- 5 files changed, 53 insertions(+), 12 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 2f3adb39c2..b9eb7f8ec4 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,7 +17,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir - val checkpointDuration: Duration = ssc.checkpointDuration + val checkpointDuration = ssc.checkpointDuration + val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 3b910538e0..5acdd01e58 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -3,6 +3,8 @@ package spark.streaming import spark.Logging import spark.SparkEnv import java.util.concurrent.Executors +import collection.mutable.HashMap +import collection.mutable.ArrayBuffer private[streaming] @@ -19,15 +21,41 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { case e: Exception => logError("Running " + job + " failed", e) } + clearJob(job) } } initLogging() val jobExecutor = Executors.newFixedThreadPool(numThreads) - + val jobs = new HashMap[Time, ArrayBuffer[Job]] + def runJob(job: Job) { + jobs.synchronized { + jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job + } jobExecutor.execute(new JobHandler(ssc, job)) logInfo("Added " + job + " to queue") } + + private def clearJob(job: Job) { + jobs.synchronized { + val jobsOfTime = jobs.get(job.time) + if (jobsOfTime.isDefined) { + jobsOfTime.get -= job + if (jobsOfTime.get.isEmpty) { + jobs -= job.time + } + } else { + throw new Exception("Job finished for time " + job.time + + " but time does not exist in jobs") + } + } + } + + def getPendingTimes(): Array[Time] = { + jobs.synchronized { + jobs.keySet.toArray + } + } } diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index c04ed37de8..b77986a3ba 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -35,10 +35,13 @@ class Scheduler(ssc: StreamingContext) extends Logging { // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { - val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds + val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } + // Reschedule the batches that were received but not processed before failure + ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + // Restart the timer timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") } else { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 2cf00e3baa..5781b1cc72 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -133,7 +133,7 @@ class StreamingContext private ( } } - protected[streaming] def getInitialCheckpoint(): Checkpoint = { + protected[streaming] def initialCheckpoint: Checkpoint = { if (isCheckpointPresent) cp_ else null } @@ -367,7 +367,7 @@ class StreamingContext private ( } /** - * Sstops the execution of the streams. + * Stops the execution of the streams. */ def stop() { try { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 4f6204f205..34e51e9562 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -44,7 +44,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + /* test("network input stream") { // Start the server testServer = new TestServer(testPort) @@ -236,8 +236,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i).head.toString === expectedOutput(i)) } } - - test("file input stream with checkpoint") { + */ + test("file input stream with master failure") { // Create a temporary directory testDir = { var temp = File.createTempFile(".temp.", Random.nextInt().toString) @@ -251,11 +251,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val fileStream = ssc.textFileStream(testDir.toString) - val outputBuffer = new ArrayBuffer[Seq[Int]] - // Reduced over a large window to ensure that recovery from master failure + // Making value 3 take large time to process, to ensure that the master + // shuts down in the middle of processing the 3rd batch + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) Thread.sleep(1000) + i + }) + // Reducing over a large window to ensure that recovery from master failure // requires reprocessing of all the files seen before the failure - val reducedStream = fileStream.map(_.toInt) - .reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -275,6 +281,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + // Create files while the master is down for (i <- Seq(4, 5, 6)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(1000) @@ -293,6 +300,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(500) } Thread.sleep(1000) + logInfo("Output = " + outputStream.output.mkString(",")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() @@ -316,6 +324,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(outputBuffer(i).head === expectedOutput(i)) } } + } -- cgit v1.2.3 From 9c8ff1e55fb97980e7f0bb7f305c1ed0e59b749e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 23 Jan 2013 07:31:49 -0800 Subject: Fixed checkpoint testcases --- streaming/src/test/java/JavaAPISuite.java | 23 +-- .../scala/spark/streaming/CheckpointSuite.scala | 115 ++++++++++++++- .../scala/spark/streaming/InputStreamsSuite.scala | 163 +-------------------- 3 files changed, 129 insertions(+), 172 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java index c84e7331c7..7a189d85b4 100644 --- a/streaming/src/test/java/JavaAPISuite.java +++ b/streaming/src/test/java/JavaAPISuite.java @@ -45,7 +45,7 @@ public class JavaAPISuite implements Serializable { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port"); } - + /* @Test public void testCount() { List> inputData = Arrays.asList( @@ -434,7 +434,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - + */ /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. @@ -450,7 +450,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } - + /* // PairDStream Functions @Test public void testPairFilter() { @@ -897,7 +897,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } - + */ @Test public void testCheckpointMasterRecovery() throws InterruptedException { List> inputData = Arrays.asList( @@ -911,7 +911,6 @@ public class JavaAPISuite implements Serializable { Arrays.asList(1,4), Arrays.asList(8,7)); - File tempDir = Files.createTempDir(); ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); @@ -927,14 +926,16 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expectedInitial, initialResult); Thread.sleep(1000); - ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - ssc.start(); - List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); - assertOrderInvariantEquals(expectedFinal, finalResult); + // Tweak to take into consideration that the last batch before failure + // will be re-processed after recovery + List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); + assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); } + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @Test public void testCheckpointofIndividualStream() throws InterruptedException { @@ -963,7 +964,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result1); } */ - + /* // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @@ -1025,5 +1026,5 @@ public class JavaAPISuite implements Serializable { public void testFileStream() { JavaPairDStream foo = ssc.fileStream("/tmp/foo"); - } + }*/ } diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 58da4ee539..04ccca4c01 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -7,6 +7,8 @@ import org.scalatest.BeforeAndAfter import org.apache.commons.io.FileUtils import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.{Clock, ManualClock} +import scala.util.Random +import com.google.common.io.Files class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { @@ -32,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def actuallyWait = true - test("basic stream+rdd recovery") { + test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") @@ -117,7 +119,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc = null } - test("map and reduceByKey") { + // This tests whether the systm can recover from a master failure with simple + // non-stateful operations. This assumes as reliable, replayable input + // source - TestInputDStream. + test("recovery with map and reduceByKey operations") { testCheckpointedOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ), (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), @@ -126,7 +131,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) } - test("reduceByKeyAndWindowInv") { + + // This tests whether the ReduceWindowedDStream's RDD checkpoints works correctly such + // that the system can recover from a master failure. This assumes as reliable, + // replayable input source - TestInputDStream. + test("recovery with invertible reduceByKeyAndWindow operation") { val n = 10 val w = 4 val input = (1 to n).map(_ => Seq("a")).toSeq @@ -139,7 +148,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { testCheckpointedOperation(input, operation, output, 7) } - test("updateStateByKey") { + + // This tests whether the StateDStream's RDD checkpoints works correctly such + // that the system can recover from a master failure. This assumes as reliable, + // replayable input source - TestInputDStream. + test("recovery with updateStateByKey operation") { val input = (1 to 10).map(_ => Seq("a")).toSeq val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { @@ -154,11 +167,99 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { testCheckpointedOperation(input, operation, output, 7) } + // This tests whether file input stream remembers what files were seen before + // the master failure and uses them again to process a large window operatoin. + // It also tests whether batches, whose processing was incomplete due to the + // failure, are re-processed or not. + test("recovery with file input stream") { + // Set up the streaming context and input streams + val testDir = Files.createTempDir() + var ssc = new StreamingContext(master, framework, batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val fileStream = ssc.textFileStream(testDir.toString) + // Making value 3 take large time to process, to ensure that the master + // shuts down in the middle of processing the 3rd batch + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) Thread.sleep(1000) + i + }) + // Reducing over a large window to ensure that recovery from master failure + // requires reprocessing of all the files seen before the failure + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputBuffer = new ArrayBuffer[Seq[Int]] + var outputStream = new TestOutputStream(reducedStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + // wait to make sure that the file is written such that it gets shown in the file listings + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + // wait to make sure that FileInputDStream picks up this file only and not any other file + Thread.sleep(500) + } + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0, "No files processed before restart") + ssc.stop() + + // Create files while the master is down + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(1000) + } + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(7, 8, 9)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) + } + Thread.sleep(1000) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0, "No files processed after restart") + ssc.stop() + + // Append the new output to the old buffer + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + outputBuffer ++= outputStream.output + + // Verify whether data received by Spark Streaming was as expected + val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } + } + + /** - * Tests a streaming operation under checkpointing, by restart the operation + * Tests a streaming operation under checkpointing, by restarting the operation * from checkpoint file and verifying whether the final output is correct. * The output is assumed to have come from a reliable queue which an replay * data as required. + * + * NOTE: This takes into consideration that the last batch processed before + * master failure will be re-processed after restart/recovery. */ def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], @@ -172,7 +273,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val totalNumBatches = input.size val nextNumBatches = totalNumBatches - initialNumBatches val initialNumExpectedOutputs = initialNumBatches - val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 + // because the last batch will be processed again // Do the computation for initial number of batches, create checkpoint file and quit ssc = setupStreams[U, V](input, operation) @@ -188,6 +290,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) + // the first element will be re-processed data of the last batch before restart verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) ssc = null } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 34e51e9562..aa08ea1141 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -19,35 +19,24 @@ import org.apache.avro.ipc.specific.SpecificRequestor import java.nio.ByteBuffer import collection.JavaConversions._ import java.nio.charset.Charset +import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val testPort = 9999 - var testServer: TestServer = null - var testDir: File = null - override def checkpointDir = "checkpoint" after { - FileUtils.deleteDirectory(new File(checkpointDir)) - if (testServer != null) { - testServer.stop() - testServer = null - } - if (testDir != null && testDir.exists()) { - FileUtils.deleteDirectory(testDir) - testDir = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - /* + + test("network input stream") { // Start the server - testServer = new TestServer(testPort) + val testPort = 9999 + val testServer = new TestServer(testPort) testServer.start() // Set up the streaming context and input streams @@ -93,46 +82,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("network input stream with checkpoint") { - // Start the server - testServer = new TestServer(testPort) - testServer.start() - - // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework, batchDuration) - ssc.checkpoint(checkpointDir, checkpointInterval) - val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) - ssc.registerOutputStream(outputStream) - ssc.start() - - // Feed data to the server to send to the network receiver - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - for (i <- Seq(1, 2, 3)) { - testServer.send(i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - assert(outputStream.output.size > 0) - ssc.stop() - - // Restart stream computation from checkpoint and feed more data to see whether - // they are being received and processed - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - for (i <- Seq(4, 5, 6)) { - testServer.send(i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - assert(outputStream.output.size > 0) - ssc.stop() - } test("flume input stream") { // Set up the streaming context and input streams @@ -182,18 +131,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("file input stream") { - - // Create a temporary directory - testDir = { - var temp = File.createTempFile(".temp.", Random.nextInt().toString) - temp.delete() - temp.mkdirs() - logInfo("Created temp dir " + temp) - temp - } + test("file input stream") { // Set up the streaming context and input streams + val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] @@ -235,96 +176,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i).size === 1) assert(output(i).head.toString === expectedOutput(i)) } + FileUtils.deleteDirectory(testDir) } - */ - test("file input stream with master failure") { - // Create a temporary directory - testDir = { - var temp = File.createTempFile(".temp.", Random.nextInt().toString) - temp.delete() - temp.mkdirs() - logInfo("Created temp dir " + temp) - temp - } - - // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework, batchDuration) - ssc.checkpoint(checkpointDir, checkpointInterval) - val fileStream = ssc.textFileStream(testDir.toString) - // Making value 3 take large time to process, to ensure that the master - // shuts down in the middle of processing the 3rd batch - val mappedStream = fileStream.map(s => { - val i = s.toInt - if (i == 3) Thread.sleep(1000) - i - }) - // Reducing over a large window to ensure that recovery from master failure - // requires reprocessing of all the files seen before the failure - val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) - val outputBuffer = new ArrayBuffer[Seq[Int]] - var outputStream = new TestOutputStream(reducedStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc.start() - - // Create files and advance manual clock to process them - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(1000) - for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - // wait to make sure that the file is written such that it gets shown in the file listings - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - // wait to make sure that FileInputDStream picks up this file only and not any other file - Thread.sleep(500) - } - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0, "No files processed before restart") - ssc.stop() - - // Create files while the master is down - for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(1000) - } - - // Restart stream computation from checkpoint and create more files to see whether - // they are being processed - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - for (i <- Seq(7, 8, 9)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) - } - Thread.sleep(1000) - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0, "No files processed after restart") - ssc.stop() - - // Append the new output to the old buffer - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] - outputBuffer ++= outputStream.output - - // Verify whether data received by Spark Streaming was as expected - val expectedOutput = Seq(1, 3, 6, 28, 36, 45) - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) - } - } - } -- cgit v1.2.3 From 7dfb82a992d47491174d7929e31351d26cadfcda Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 22 Jan 2013 15:25:41 -0600 Subject: Replace old 'master' term with 'driver'. --- bagel/src/test/scala/bagel/BagelSuite.scala | 2 +- core/src/main/scala/spark/MapOutputTracker.scala | 10 +-- core/src/main/scala/spark/SparkContext.scala | 20 +++--- core/src/main/scala/spark/SparkEnv.scala | 22 +++---- .../spark/broadcast/BitTorrentBroadcast.scala | 24 +++---- .../src/main/scala/spark/broadcast/Broadcast.scala | 6 +- .../scala/spark/broadcast/BroadcastFactory.scala | 4 +- .../main/scala/spark/broadcast/HttpBroadcast.scala | 6 +- .../main/scala/spark/broadcast/MultiTracker.scala | 35 +++++----- .../main/scala/spark/broadcast/TreeBroadcast.scala | 52 +++++++-------- .../scala/spark/deploy/LocalSparkCluster.scala | 34 +++++----- .../scala/spark/deploy/client/ClientListener.scala | 4 +- .../main/scala/spark/deploy/master/JobInfo.scala | 2 +- .../main/scala/spark/deploy/master/Master.scala | 18 +++--- .../spark/executor/StandaloneExecutorBackend.scala | 26 ++++---- .../cluster/SparkDeploySchedulerBackend.scala | 33 +++++----- .../cluster/StandaloneClusterMessage.scala | 8 +-- .../cluster/StandaloneSchedulerBackend.scala | 74 +++++++++++----------- .../mesos/CoarseMesosSchedulerBackend.scala | 6 +- .../scala/spark/storage/BlockManagerMaster.scala | 69 ++++++++++---------- .../main/scala/spark/storage/ThreadingTest.scala | 6 +- core/src/test/scala/spark/JavaAPISuite.java | 2 +- core/src/test/scala/spark/LocalSparkContext.scala | 2 +- .../test/scala/spark/MapOutputTrackerSuite.scala | 2 +- docs/configuration.md | 12 ++-- python/pyspark/tests.py | 2 +- repl/src/test/scala/spark/repl/ReplSuite.scala | 2 +- .../streaming/dstream/NetworkInputDStream.scala | 4 +- .../test/java/spark/streaming/JavaAPISuite.java | 2 +- .../spark/streaming/BasicOperationsSuite.scala | 2 +- .../scala/spark/streaming/CheckpointSuite.scala | 2 +- .../test/scala/spark/streaming/FailureSuite.scala | 2 +- .../scala/spark/streaming/InputStreamsSuite.scala | 2 +- .../spark/streaming/WindowOperationsSuite.scala | 2 +- 34 files changed, 248 insertions(+), 251 deletions(-) (limited to 'streaming/src') diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index ca59f46843..3c2f9c4616 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -23,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { sc = null } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } test("halting by voting") { diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index ac02f3363a..d4f5164f7d 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -38,10 +38,7 @@ private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Ac } } -private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging { - val ip: String = System.getProperty("spark.master.host", "localhost") - val port: Int = System.getProperty("spark.master.port", "7077").toInt - val actorName: String = "MapOutputTracker" +private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolean) extends Logging { val timeout = 10.seconds @@ -56,11 +53,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea var cacheGeneration = generation val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] - var trackerActor: ActorRef = if (isMaster) { + val actorName: String = "MapOutputTracker" + var trackerActor: ActorRef = if (isDriver) { val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName) logInfo("Registered MapOutputTrackerActor actor") actor } else { + val ip = System.getProperty("spark.driver.host", "localhost") + val port = System.getProperty("spark.driver.port", "7077").toInt val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName) actorSystem.actorFor(url) } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bc9fdee8b6..d4991cb1e0 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -66,20 +66,20 @@ class SparkContext( // Ensure logging is initialized before we spawn any threads initLogging() - // Set Spark master host and port system properties - if (System.getProperty("spark.master.host") == null) { - System.setProperty("spark.master.host", Utils.localIpAddress) + // Set Spark driver host and port system properties + if (System.getProperty("spark.driver.host") == null) { + System.setProperty("spark.driver.host", Utils.localIpAddress) } - if (System.getProperty("spark.master.port") == null) { - System.setProperty("spark.master.port", "0") + if (System.getProperty("spark.driver.port") == null) { + System.setProperty("spark.driver.port", "0") } private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.createFromSystemProperties( - System.getProperty("spark.master.host"), - System.getProperty("spark.master.port").toInt, + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port").toInt, true, isLocal) SparkEnv.set(env) @@ -396,14 +396,14 @@ class SparkContext( /** * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values - * to using the `+=` method. Only the master can access the accumulator's `value`. + * to using the `+=` method. Only the driver can access the accumulator's `value`. */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** * Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`. - * Only the master can access the accumuable's `value`. + * Only the driver can access the accumuable's `value`. * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ @@ -530,7 +530,7 @@ class SparkContext( /** * Run a function on a given set of partitions in an RDD and return the results. This is the main * entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies - * whether the scheduler can run the computation on the master rather than shipping it out to the + * whether the scheduler can run the computation on the driver rather than shipping it out to the * cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 2a7a8af83d..4034af610c 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -60,15 +60,15 @@ object SparkEnv extends Logging { def createFromSystemProperties( hostname: String, port: Int, - isMaster: Boolean, + isDriver: Boolean, isLocal: Boolean ) : SparkEnv = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) - // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), - // figure out which port number Akka actually bound to and set spark.master.port to it. - if (isMaster && port == 0) { - System.setProperty("spark.master.port", boundPort.toString) + // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), + // figure out which port number Akka actually bound to and set spark.driver.port to it. + if (isDriver && port == 0) { + System.setProperty("spark.driver.port", boundPort.toString) } val classLoader = Thread.currentThread.getContextClassLoader @@ -82,22 +82,22 @@ object SparkEnv extends Logging { val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer") - val masterIp: String = System.getProperty("spark.master.host", "localhost") - val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt + val driverIp: String = System.getProperty("spark.driver.host", "localhost") + val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt val blockManagerMaster = new BlockManagerMaster( - actorSystem, isMaster, isLocal, masterIp, masterPort) + actorSystem, isDriver, isLocal, driverIp, driverPort) val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer) val connectionManager = blockManager.connectionManager - val broadcastManager = new BroadcastManager(isMaster) + val broadcastManager = new BroadcastManager(isDriver) val closureSerializer = instantiateClass[Serializer]( "spark.closure.serializer", "spark.JavaSerializer") val cacheManager = new CacheManager(blockManager) - val mapOutputTracker = new MapOutputTracker(actorSystem, isMaster) + val mapOutputTracker = new MapOutputTracker(actorSystem, isDriver) val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher") @@ -109,7 +109,7 @@ object SparkEnv extends Logging { // Set the sparkFiles directory, used when downloading dependencies. In local mode, // this is a temporary directory; in distributed mode, this is the executor's current working // directory. - val sparkFilesDir: String = if (isMaster) { + val sparkFilesDir: String = if (isDriver) { Utils.createTempDir().getAbsolutePath } else { "." diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 386f505f2a..adcb2d2415 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -31,7 +31,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: @transient var totalBlocks = -1 @transient var hasBlocks = new AtomicInteger(0) - // Used ONLY by Master to track how many unique blocks have been sent out + // Used ONLY by driver to track how many unique blocks have been sent out @transient var sentBlocks = new AtomicInteger(0) @transient var listenPortLock = new Object @@ -42,7 +42,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: @transient var serveMR: ServeMultipleRequests = null - // Used only in Master + // Used only in driver @transient var guideMR: GuideMultipleRequests = null // Used only in Workers @@ -99,14 +99,14 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: } // Must always come AFTER listenPort is created - val masterSource = + val driverSource = SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes) hasBlocksBitVector.synchronized { - masterSource.hasBlocksBitVector = hasBlocksBitVector + driverSource.hasBlocksBitVector = hasBlocksBitVector } // In the beginning, this is the only known source to Guide - listOfSources += masterSource + listOfSources += driverSource // Register with the Tracker MultiTracker.registerBroadcast(id, @@ -122,7 +122,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: case None => logInfo("Started reading broadcast variable " + id) - // Initializing everything because Master will only send null/0 values + // Initializing everything because driver will only send null/0 values // Only the 1st worker in a node can be here. Others will get from cache initializeWorkerVariables() @@ -151,7 +151,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: } } - // Initialize variables in the worker node. Master sends everything as 0/null + // Initialize variables in the worker node. Driver sends everything as 0/null private def initializeWorkerVariables() { arrayOfBlocks = null hasBlocksBitVector = null @@ -248,7 +248,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: // Receive source information from Guide var suitableSources = oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]] - logDebug("Received suitableSources from Master " + suitableSources) + logDebug("Received suitableSources from Driver " + suitableSources) addToListOfSources(suitableSources) @@ -532,7 +532,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: oosSource.writeObject(blockToAskFor) oosSource.flush() - // CHANGED: Master might send some other block than the one + // CHANGED: Driver might send some other block than the one // requested to ensure fast spreading of all blocks. val recvStartTime = System.currentTimeMillis val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] @@ -982,9 +982,9 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: // Receive which block to send var blockToSend = ois.readObject.asInstanceOf[Int] - // If it is master AND at least one copy of each block has not been + // If it is driver AND at least one copy of each block has not been // sent out already, MODIFY blockToSend - if (MultiTracker.isMaster && sentBlocks.get < totalBlocks) { + if (MultiTracker.isDriver && sentBlocks.get < totalBlocks) { blockToSend = sentBlocks.getAndIncrement } @@ -1031,7 +1031,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: private[spark] class BitTorrentBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) } + def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = new BitTorrentBroadcast[T](value_, isLocal, id) diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 2ffe7f741d..415bde5d67 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -15,7 +15,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { } private[spark] -class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { +class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable { private var initialized = false private var broadcastFactory: BroadcastFactory = null @@ -33,7 +33,7 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] // Initialize appropriate BroadcastFactory and BroadcastObject - broadcastFactory.initialize(isMaster) + broadcastFactory.initialize(isDriver) initialized = true } @@ -49,5 +49,5 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl def newBroadcast[T](value_ : T, isLocal: Boolean) = broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) - def isMaster = isMaster_ + def isDriver = _isDriver } diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala index ab6d302827..5c6184c3c7 100644 --- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala @@ -7,7 +7,7 @@ package spark.broadcast * entire Spark job. */ private[spark] trait BroadcastFactory { - def initialize(isMaster: Boolean): Unit - def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] + def initialize(isDriver: Boolean): Unit + def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] def stop(): Unit } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 8e490e6bad..7e30b8f7d2 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -48,7 +48,7 @@ extends Broadcast[T](id) with Logging with Serializable { } private[spark] class HttpBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) } + def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = new HttpBroadcast[T](value_, isLocal, id) @@ -69,12 +69,12 @@ private object HttpBroadcast extends Logging { private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup) - def initialize(isMaster: Boolean) { + def initialize(isDriver: Boolean) { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt compress = System.getProperty("spark.broadcast.compress", "true").toBoolean - if (isMaster) { + if (isDriver) { createServer() } serverUri = System.getProperty("spark.httpBroadcast.uri") diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index 5e76dedb94..3fd77af73f 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -23,25 +23,24 @@ extends Logging { var ranGen = new Random private var initialized = false - private var isMaster_ = false + private var _isDriver = false private var stopBroadcast = false private var trackMV: TrackMultipleValues = null - def initialize(isMaster__ : Boolean) { + def initialize(__isDriver: Boolean) { synchronized { if (!initialized) { + _isDriver = __isDriver - isMaster_ = isMaster__ - - if (isMaster) { + if (isDriver) { trackMV = new TrackMultipleValues trackMV.setDaemon(true) trackMV.start() - // Set masterHostAddress to the master's IP address for the slaves to read - System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress) + // Set DriverHostAddress to the driver's IP address for the slaves to read + System.setProperty("spark.MultiTracker.DriverHostAddress", Utils.localIpAddress) } initialized = true @@ -54,10 +53,10 @@ extends Logging { } // Load common parameters - private var MasterHostAddress_ = System.getProperty( - "spark.MultiTracker.MasterHostAddress", "") - private var MasterTrackerPort_ = System.getProperty( - "spark.broadcast.masterTrackerPort", "11111").toInt + private var DriverHostAddress_ = System.getProperty( + "spark.MultiTracker.DriverHostAddress", "") + private var DriverTrackerPort_ = System.getProperty( + "spark.broadcast.driverTrackerPort", "11111").toInt private var BlockSize_ = System.getProperty( "spark.broadcast.blockSize", "4096").toInt * 1024 private var MaxRetryCount_ = System.getProperty( @@ -91,11 +90,11 @@ extends Logging { private var EndGameFraction_ = System.getProperty( "spark.broadcast.endGameFraction", "0.95").toDouble - def isMaster = isMaster_ + def isDriver = _isDriver // Common config params - def MasterHostAddress = MasterHostAddress_ - def MasterTrackerPort = MasterTrackerPort_ + def DriverHostAddress = DriverHostAddress_ + def DriverTrackerPort = DriverTrackerPort_ def BlockSize = BlockSize_ def MaxRetryCount = MaxRetryCount_ @@ -123,7 +122,7 @@ extends Logging { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null - serverSocket = new ServerSocket(MasterTrackerPort) + serverSocket = new ServerSocket(DriverTrackerPort) logInfo("TrackMultipleValues started at " + serverSocket) try { @@ -235,7 +234,7 @@ extends Logging { try { // Connect to the tracker to find out GuideInfo clientSocketToTracker = - new Socket(MultiTracker.MasterHostAddress, MultiTracker.MasterTrackerPort) + new Socket(MultiTracker.DriverHostAddress, MultiTracker.DriverTrackerPort) oosTracker = new ObjectOutputStream(clientSocketToTracker.getOutputStream) oosTracker.flush() @@ -276,7 +275,7 @@ extends Logging { } def registerBroadcast(id: Long, gInfo: SourceInfo) { - val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort) + val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) oosST.flush() val oisST = new ObjectInputStream(socket.getInputStream) @@ -303,7 +302,7 @@ extends Logging { } def unregisterBroadcast(id: Long) { - val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort) + val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) oosST.flush() val oisST = new ObjectInputStream(socket.getInputStream) diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index f573512835..c55c476117 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -98,7 +98,7 @@ extends Broadcast[T](id) with Logging with Serializable { case None => logInfo("Started reading broadcast variable " + id) - // Initializing everything because Master will only send null/0 values + // Initializing everything because Driver will only send null/0 values // Only the 1st worker in a node can be here. Others will get from cache initializeWorkerVariables() @@ -157,55 +157,55 @@ extends Broadcast[T](id) with Logging with Serializable { listenPortLock.synchronized { listenPortLock.wait() } } - var clientSocketToMaster: Socket = null - var oosMaster: ObjectOutputStream = null - var oisMaster: ObjectInputStream = null + var clientSocketToDriver: Socket = null + var oosDriver: ObjectOutputStream = null + var oisDriver: ObjectInputStream = null // Connect and receive broadcast from the specified source, retrying the // specified number of times in case of failures var retriesLeft = MultiTracker.MaxRetryCount do { - // Connect to Master and send this worker's Information - clientSocketToMaster = new Socket(MultiTracker.MasterHostAddress, gInfo.listenPort) - oosMaster = new ObjectOutputStream(clientSocketToMaster.getOutputStream) - oosMaster.flush() - oisMaster = new ObjectInputStream(clientSocketToMaster.getInputStream) + // Connect to Driver and send this worker's Information + clientSocketToDriver = new Socket(MultiTracker.DriverHostAddress, gInfo.listenPort) + oosDriver = new ObjectOutputStream(clientSocketToDriver.getOutputStream) + oosDriver.flush() + oisDriver = new ObjectInputStream(clientSocketToDriver.getInputStream) - logDebug("Connected to Master's guiding object") + logDebug("Connected to Driver's guiding object") // Send local source information - oosMaster.writeObject(SourceInfo(hostAddress, listenPort)) - oosMaster.flush() + oosDriver.writeObject(SourceInfo(hostAddress, listenPort)) + oosDriver.flush() - // Receive source information from Master - var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo] + // Receive source information from Driver + var sourceInfo = oisDriver.readObject.asInstanceOf[SourceInfo] totalBlocks = sourceInfo.totalBlocks arrayOfBlocks = new Array[BroadcastBlock](totalBlocks) totalBlocksLock.synchronized { totalBlocksLock.notifyAll() } totalBytes = sourceInfo.totalBytes - logDebug("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) + logDebug("Received SourceInfo from Driver:" + sourceInfo + " My Port: " + listenPort) val start = System.nanoTime val receptionSucceeded = receiveSingleTransmission(sourceInfo) val time = (System.nanoTime - start) / 1e9 - // Updating some statistics in sourceInfo. Master will be using them later + // Updating some statistics in sourceInfo. Driver will be using them later if (!receptionSucceeded) { sourceInfo.receptionFailed = true } - // Send back statistics to the Master - oosMaster.writeObject(sourceInfo) + // Send back statistics to the Driver + oosDriver.writeObject(sourceInfo) - if (oisMaster != null) { - oisMaster.close() + if (oisDriver != null) { + oisDriver.close() } - if (oosMaster != null) { - oosMaster.close() + if (oosDriver != null) { + oosDriver.close() } - if (clientSocketToMaster != null) { - clientSocketToMaster.close() + if (clientSocketToDriver != null) { + clientSocketToDriver.close() } retriesLeft -= 1 @@ -552,7 +552,7 @@ extends Broadcast[T](id) with Logging with Serializable { } private def sendObject() { - // Wait till receiving the SourceInfo from Master + // Wait till receiving the SourceInfo from Driver while (totalBlocks == -1) { totalBlocksLock.synchronized { totalBlocksLock.wait() } } @@ -576,7 +576,7 @@ extends Broadcast[T](id) with Logging with Serializable { private[spark] class TreeBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) } + def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = new TreeBroadcast[T](value_, isLocal, id) diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 4211d80596..ae083efc8d 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -10,7 +10,7 @@ import spark.{Logging, Utils} import scala.collection.mutable.ArrayBuffer private[spark] -class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { +class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { val localIpAddress = Utils.localIpAddress @@ -19,33 +19,31 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) var masterPort : Int = _ var masterUrl : String = _ - val slaveActorSystems = ArrayBuffer[ActorSystem]() - val slaveActors = ArrayBuffer[ActorRef]() + val workerActorSystems = ArrayBuffer[ActorSystem]() + val workerActors = ArrayBuffer[ActorRef]() def start() : String = { - logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") + logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) masterActorSystem = actorSystem masterUrl = "spark://" + localIpAddress + ":" + masterPort - val actor = masterActorSystem.actorOf( + masterActor = masterActorSystem.actorOf( Props(new Master(localIpAddress, masterPort, 0)), name = "Master") - masterActor = actor - /* Start the Slaves */ - for (slaveNum <- 1 to numSlaves) { - /* We can pretend to test distributed stuff by giving the slaves distinct hostnames. + /* Start the Workers */ + for (workerNum <- 1 to numWorkers) { + /* We can pretend to test distributed stuff by giving the workers distinct hostnames. All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is sufficiently distinctive. */ - val slaveIpAddress = "127.100.0." + (slaveNum % 256) + val workerIpAddress = "127.100.0." + (workerNum % 256) val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0) - slaveActorSystems += actorSystem - val actor = actorSystem.actorOf( - Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), + AkkaUtils.createActorSystem("sparkWorker" + workerNum, workerIpAddress, 0) + workerActorSystems += actorSystem + workerActors += actorSystem.actorOf( + Props(new Worker(workerIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)), name = "Worker") - slaveActors += actor } return masterUrl @@ -53,9 +51,9 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) def stop() { logInfo("Shutting down local Spark cluster.") - // Stop the slaves before the master so they don't get upset that it disconnected - slaveActorSystems.foreach(_.shutdown()) - slaveActorSystems.foreach(_.awaitTermination()) + // Stop the workers before the master so they don't get upset that it disconnected + workerActorSystems.foreach(_.shutdown()) + workerActorSystems.foreach(_.awaitTermination()) masterActorSystem.shutdown() masterActorSystem.awaitTermination() } diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala index da6abcc9c2..7035f4b394 100644 --- a/core/src/main/scala/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala @@ -12,7 +12,7 @@ private[spark] trait ClientListener { def disconnected(): Unit - def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit + def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int): Unit - def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit + def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit } diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 130b031a2a..a274b21c34 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -10,7 +10,7 @@ private[spark] class JobInfo( val id: String, val desc: JobDescription, val submitDate: Date, - val actor: ActorRef) + val driver: ActorRef) { var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 2c2cd0231b..3347207c6d 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -88,7 +88,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor execOption match { case Some(exec) => { exec.state = state - exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus) + exec.job.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val jobInfo = idToJob(jobId) // Remove this executor from the worker and job @@ -199,7 +199,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome) - exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) + exec.job.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) } def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, @@ -221,19 +221,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address for (exec <- worker.executors.values) { - exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) + exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) exec.job.executors -= exec.id } } - def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { + def addJob(desc: JobDescription, driver: ActorRef): JobInfo = { val now = System.currentTimeMillis() val date = new Date(now) - val job = new JobInfo(now, newJobId(date), desc, date, actor) + val job = new JobInfo(now, newJobId(date), desc, date, driver) jobs += job idToJob(job.id) = job - actorToJob(sender) = job - addressToJob(sender.path.address) = job + actorToJob(driver) = job + addressToJob(driver.path.address) = job return job } @@ -242,8 +242,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor logInfo("Removing job " + job.id) jobs -= job idToJob -= job.id - actorToJob -= job.actor - addressToWorker -= job.actor.path.address + actorToJob -= job.driver + addressToWorker -= job.driver.path.address completedJobs += job // Remember it in our history waitingJobs -= job for (exec <- job.executors.values) { diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index a29bf974d2..f80f1b5274 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -16,33 +16,33 @@ import spark.scheduler.cluster.RegisterSlave private[spark] class StandaloneExecutorBackend( executor: Executor, - masterUrl: String, - slaveId: String, + driverUrl: String, + workerId: String, hostname: String, cores: Int) extends Actor with ExecutorBackend with Logging { - var master: ActorRef = null + var driver: ActorRef = null override def preStart() { try { - logInfo("Connecting to master: " + masterUrl) - master = context.actorFor(masterUrl) - master ! RegisterSlave(slaveId, hostname, cores) + logInfo("Connecting to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! RegisterSlave(workerId, hostname, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + context.watch(driver) // Doesn't work with remote actors, but useful for testing } catch { case e: Exception => - logError("Failed to connect to master", e) + logError("Failed to connect to driver", e) System.exit(1) } } override def receive = { case RegisteredSlave(sparkProperties) => - logInfo("Successfully registered with master") + logInfo("Successfully registered with driver") executor.initialize(hostname, sparkProperties) case RegisterSlaveFailed(message) => @@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend( } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - master ! StatusUpdate(slaveId, taskId, state, data) + driver ! StatusUpdate(workerId, taskId, state, data) } } private[spark] object StandaloneExecutorBackend { - def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) { + def run(driverUrl: String, workerId: String, hostname: String, cores: Int) { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)), + Props(new StandaloneExecutorBackend(new Executor, driverUrl, workerId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } def main(args: Array[String]) { if (args.length != 4) { - System.err.println("Usage: StandaloneExecutorBackend ") + System.err.println("Usage: StandaloneExecutorBackend ") System.exit(1) } run(args(0), args(1), args(2), args(3).toInt) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 4f82cd96dd..866beb6d01 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,7 +19,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - val executorIdToSlaveId = new HashMap[String, String] + val executorIdToWorkerId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -34,10 +34,11 @@ private[spark] class SparkDeploySchedulerBackend( override def start() { super.start() - val masterUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + // The endpoint for executors to talk to us + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) - val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone")) val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome) @@ -55,35 +56,35 @@ private[spark] class SparkDeploySchedulerBackend( } } - def connected(jobId: String) { + override def connected(jobId: String) { logInfo("Connected to Spark cluster with job ID " + jobId) } - def disconnected() { + override def disconnected() { if (!stopping) { logError("Disconnected from Spark cluster!") scheduler.error("Disconnected from Spark cluster") } } - def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { - executorIdToSlaveId += id -> workerId + override def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int) { + executorIdToWorkerId += fullId -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( - id, host, cores, Utils.memoryMegabytesToString(memory))) + fullId, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String, exitStatus: Option[Int]) { + override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code) case None => SlaveLost(message) } - logInfo("Executor %s removed: %s".format(id, message)) - executorIdToSlaveId.get(id) match { - case Some(slaveId) => - executorIdToSlaveId.remove(id) - scheduler.slaveLost(slaveId, reason) + logInfo("Executor %s removed: %s".format(fullId, message)) + executorIdToWorkerId.get(fullId) match { + case Some(workerId) => + executorIdToWorkerId.remove(fullId) + scheduler.slaveLost(workerId, reason) case None => - logInfo("No slave ID known for executor %s".format(id)) + logInfo("No worker ID known for executor %s".format(fullId)) } } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 1386cd9d44..bea9dc4f23 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -6,7 +6,7 @@ import spark.util.SerializableBuffer private[spark] sealed trait StandaloneClusterMessage extends Serializable -// Master to slaves +// Driver to executors private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage @@ -16,7 +16,7 @@ case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends Stand private[spark] case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage -// Slaves to master +// Executors to driver private[spark] case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage @@ -32,6 +32,6 @@ object StatusUpdate { } } -// Internal messages in master +// Internal messages in driver private[spark] case object ReviveOffers extends StandaloneClusterMessage -private[spark] case object StopMaster extends StandaloneClusterMessage +private[spark] case object StopDriver extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index eeaae23dc8..d742a7b2bf 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -23,7 +23,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor { + class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { val slaveActor = new HashMap[String, ActorRef] val slaveAddress = new HashMap[String, Address] val slaveHost = new HashMap[String, String] @@ -37,34 +37,34 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } def receive = { - case RegisterSlave(slaveId, host, cores) => - if (slaveActor.contains(slaveId)) { - sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId) + case RegisterSlave(workerId, host, cores) => + if (slaveActor.contains(workerId)) { + sender ! RegisterSlaveFailed("Duplicate slave ID: " + workerId) } else { - logInfo("Registered slave: " + sender + " with ID " + slaveId) + logInfo("Registered slave: " + sender + " with ID " + workerId) sender ! RegisteredSlave(sparkProperties) context.watch(sender) - slaveActor(slaveId) = sender - slaveHost(slaveId) = host - freeCores(slaveId) = cores - slaveAddress(slaveId) = sender.path.address - actorToSlaveId(sender) = slaveId - addressToSlaveId(sender.path.address) = slaveId + slaveActor(workerId) = sender + slaveHost(workerId) = host + freeCores(workerId) = cores + slaveAddress(workerId) = sender.path.address + actorToSlaveId(sender) = workerId + addressToSlaveId(sender.path.address) = workerId totalCoreCount.addAndGet(cores) makeOffers() } - case StatusUpdate(slaveId, taskId, state, data) => + case StatusUpdate(workerId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - freeCores(slaveId) += 1 - makeOffers(slaveId) + freeCores(workerId) += 1 + makeOffers(workerId) } case ReviveOffers => makeOffers() - case StopMaster => + case StopDriver => sender ! true context.stop(self) @@ -85,9 +85,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } // Make fake resource offers on just one slave - def makeOffers(slaveId: String) { + def makeOffers(workerId: String) { launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))) + Seq(new WorkerOffer(workerId, slaveHost(workerId), freeCores(workerId))))) } // Launch tasks returned by a set of resource offers @@ -99,24 +99,24 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } // Remove a disconnected slave from the cluster - def removeSlave(slaveId: String, reason: String) { - logInfo("Slave " + slaveId + " disconnected, so removing it") - val numCores = freeCores(slaveId) - actorToSlaveId -= slaveActor(slaveId) - addressToSlaveId -= slaveAddress(slaveId) - slaveActor -= slaveId - slaveHost -= slaveId - freeCores -= slaveId - slaveHost -= slaveId + def removeSlave(workerId: String, reason: String) { + logInfo("Slave " + workerId + " disconnected, so removing it") + val numCores = freeCores(workerId) + actorToSlaveId -= slaveActor(workerId) + addressToSlaveId -= slaveAddress(workerId) + slaveActor -= workerId + slaveHost -= workerId + freeCores -= workerId + slaveHost -= workerId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId, SlaveLost(reason)) + scheduler.slaveLost(workerId, SlaveLost(reason)) } } - var masterActor: ActorRef = null + var driverActor: ActorRef = null val taskIdsOnSlave = new HashMap[String, HashSet[String]] - def start() { + override def start() { val properties = new ArrayBuffer[(String, String)] val iterator = System.getProperties.entrySet.iterator while (iterator.hasNext) { @@ -126,15 +126,15 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor properties += ((key, value)) } } - masterActor = actorSystem.actorOf( - Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) + driverActor = actorSystem.actorOf( + Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) } - def stop() { + override def stop() { try { - if (masterActor != null) { + if (driverActor != null) { val timeout = 5.seconds - val future = masterActor.ask(StopMaster)(timeout) + val future = driverActor.ask(StopDriver)(timeout) Await.result(future, timeout) } } catch { @@ -143,11 +143,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } - def reviveOffers() { - masterActor ! ReviveOffers + override def reviveOffers() { + driverActor ! ReviveOffers } - def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) } private[spark] object StandaloneSchedulerBackend { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 014906b028..7bf56a05d6 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -104,11 +104,11 @@ private[spark] class CoarseMesosSchedulerBackend( def createCommand(offer: Offer, numCores: Int): CommandInfo = { val runScript = new File(sparkHome, "run").getCanonicalPath - val masterUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a3d8671834..9fd2b454a4 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -11,52 +11,51 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} - private[spark] class BlockManagerMaster( val actorSystem: ActorSystem, - isMaster: Boolean, + isDriver: Boolean, isLocal: Boolean, - masterIp: String, - masterPort: Int) + driverIp: String, + driverPort: Int) extends Logging { val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt - val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager" + val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager" val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds - var masterActor: ActorRef = { - if (isMaster) { - val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), - name = MASTER_AKKA_ACTOR_NAME) + var driverActor: ActorRef = { + if (isDriver) { + val driverActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), + name = DRIVER_AKKA_ACTOR_NAME) logInfo("Registered BlockManagerMaster Actor") - masterActor + driverActor } else { - val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME) + val url = "akka://spark@%s:%s/user/%s".format(driverIp, driverPort, DRIVER_AKKA_ACTOR_NAME) logInfo("Connecting to BlockManagerMaster: " + url) actorSystem.actorFor(url) } } - /** Remove a dead host from the master actor. This is only called on the master side. */ + /** Remove a dead host from the driver actor. This is only called on the driver side. */ def notifyADeadHost(host: String) { tell(RemoveHost(host)) logInfo("Removed " + host + " successfully in notifyADeadHost") } /** - * Send the master actor a heart beat from the slave. Returns true if everything works out, - * false if the master does not know about the given block manager, which means the block + * Send the driver actor a heart beat from the slave. Returns true if everything works out, + * false if the driver does not know about the given block manager, which means the block * manager should re-register. */ def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { - askMasterWithRetry[Boolean](HeartBeat(blockManagerId)) + askDriverWithReply[Boolean](HeartBeat(blockManagerId)) } - /** Register the BlockManager's id with the master. */ + /** Register the BlockManager's id with the driver. */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { logInfo("Trying to register BlockManager") @@ -70,25 +69,25 @@ private[spark] class BlockManagerMaster( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { - val res = askMasterWithRetry[Boolean]( + val res = askDriverWithReply[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logInfo("Updated info of block " + blockId) res } - /** Get locations of the blockId from the master */ + /** Get locations of the blockId from the driver */ def getLocations(blockId: String): Seq[BlockManagerId] = { - askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) + askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) } - /** Get locations of multiple blockIds from the master */ + /** Get locations of multiple blockIds from the driver */ def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = { - askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) + askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) } - /** Get ids of other nodes in the cluster from the master */ + /** Get ids of other nodes in the cluster from the driver */ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { - val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) + val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) if (result.length != numPeers) { throw new SparkException( "Error getting peers, only got " + result.size + " instead of " + numPeers) @@ -98,10 +97,10 @@ private[spark] class BlockManagerMaster( /** * Remove a block from the slaves that have it. This can only be used to remove - * blocks that the master knows about. + * blocks that the driver knows about. */ def removeBlock(blockId: String) { - askMasterWithRetry(RemoveBlock(blockId)) + askDriverWithReply(RemoveBlock(blockId)) } /** @@ -111,33 +110,33 @@ private[spark] class BlockManagerMaster( * amount of remaining memory. */ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) + askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } - /** Stop the master actor, called only on the Spark master node */ + /** Stop the driver actor, called only on the Spark driver node */ def stop() { - if (masterActor != null) { + if (driverActor != null) { tell(StopBlockManagerMaster) - masterActor = null + driverActor = null logInfo("BlockManagerMaster stopped") } } /** Send a one-way message to the master actor, to which we expect it to reply with true. */ private def tell(message: Any) { - if (!askMasterWithRetry[Boolean](message)) { + if (!askDriverWithReply[Boolean](message)) { throw new SparkException("BlockManagerMasterActor returned false, expected true.") } } /** - * Send a message to the master actor and get its result within a default timeout, or + * Send a message to the driver actor and get its result within a default timeout, or * throw a SparkException if this fails. */ - private def askMasterWithRetry[T](message: Any): T = { + private def askDriverWithReply[T](message: Any): T = { // TODO: Consider removing multiple attempts - if (masterActor == null) { - throw new SparkException("Error sending message to BlockManager as masterActor is null " + + if (driverActor == null) { + throw new SparkException("Error sending message to BlockManager as driverActor is null " + "[message = " + message + "]") } var attempts = 0 @@ -145,7 +144,7 @@ private[spark] class BlockManagerMaster( while (attempts < AKKA_RETRY_ATTEMPS) { attempts += 1 try { - val future = masterActor.ask(message)(timeout) + val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new Exception("BlockManagerMaster returned null") diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index 689f07b969..0b8f6d4303 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -75,9 +75,9 @@ private[spark] object ThreadingTest { System.setProperty("spark.kryoserializer.buffer.mb", "1") val actorSystem = ActorSystem("test") val serializer = new KryoSerializer - val masterIp: String = System.getProperty("spark.master.host", "localhost") - val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt - val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort) + val driverIp: String = System.getProperty("spark.driver.host", "localhost") + val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt + val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, driverIp, driverPort) val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 01351de4ae..42ce6f3c74 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -46,7 +46,7 @@ public class JavaAPISuite implements Serializable { sc.stop(); sc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); + System.clearProperty("spark.driver.port"); } static class ReverseIntComparator implements Comparator, Serializable { diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index b5e31ddae3..ff00dd05dd 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -26,7 +26,7 @@ object LocalSparkContext { def stop(sc: SparkContext) { sc.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 7d5305f1e0..718107d2b5 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -79,7 +79,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch") { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) - System.setProperty("spark.master.port", boundPort.toString) + System.setProperty("spark.driver.port", boundPort.toString) val masterTracker = new MapOutputTracker(actorSystem, true) val slaveTracker = new MapOutputTracker(actorSystem, false) masterTracker.registerShuffle(10, 1) diff --git a/docs/configuration.md b/docs/configuration.md index 036a0df480..a7054b4321 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -202,7 +202,7 @@ Apart from these, the following properties are also available, and may be useful 10 Maximum message size to allow in "control plane" communication (for serialized tasks and task - results), in MB. Increase this if your tasks need to send back large results to the master + results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. using collect() on a large dataset). @@ -211,7 +211,7 @@ Apart from these, the following properties are also available, and may be useful 4 Number of actor threads to use for communication. Can be useful to increase on large clusters - when the master has a lot of CPU cores. + when the driver has a lot of CPU cores. @@ -222,17 +222,17 @@ Apart from these, the following properties are also available, and may be useful - spark.master.host + spark.driver.host (local hostname) - Hostname or IP address for the master to listen on. + Hostname or IP address for the driver to listen on. - spark.master.port + spark.driver.port (random) - Port for the master to listen on. + Port for the driver to listen on. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 46ab34f063..df7235756d 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -26,7 +26,7 @@ class PySparkTestCase(unittest.TestCase): sys.path = self._old_sys_path # To avoid Akka rebinding to the same port, since it doesn't unbind # immediately on shutdown - self.sc.jvm.System.clearProperty("spark.master.port") + self.sc.jvm.System.clearProperty("spark.driver.port") class TestCheckpoint(PySparkTestCase): diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index db78d06d4f..43559b96d3 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -31,7 +31,7 @@ class ReplSuite extends FunSuite { if (interp.sparkContext != null) interp.sparkContext.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") return out.toString } diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index aa6be95f30..8c322dd698 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -153,8 +153,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** A helper actor that communicates with the NetworkInputTracker */ private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") - val ip = System.getProperty("spark.master.host", "localhost") - val port = System.getProperty("spark.master.port", "7077").toInt + val ip = System.getProperty("spark.driver.host", "localhost") + val port = System.getProperty("spark.driver.port", "7077").toInt val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorFor(url) val timeout = 5.seconds diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index c84e7331c7..79d6093429 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -43,7 +43,7 @@ public class JavaAPISuite implements Serializable { ssc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); + System.clearProperty("spark.driver.port"); } @Test diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index bfdf32c73e..4a036f0710 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -10,7 +10,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index d2f32c189b..563a7d1458 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -19,7 +19,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } var ssc: StreamingContext = null diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 7493ac1207..c4cfffbfc1 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -24,7 +24,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } override def framework = "CheckpointSuite" diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..70ae6e3934 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } test("network input stream") { diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 0c6e928835..cd9608df53 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -13,7 +13,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } val largerSlideInput = Seq( -- cgit v1.2.3 From b29599e5cf0272f0d0e3ceceebb473a8163eab8c Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 28 Jan 2013 22:24:47 -0800 Subject: Fix code that depended on metadata cleaner interval being in minutes --- streaming/src/main/scala/spark/streaming/DStream.scala | 8 ++++---- streaming/src/main/scala/spark/streaming/StreamingContext.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index b11ef443dc..352f83fe0c 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -198,10 +198,10 @@ abstract class DStream[T: ClassManifest] ( metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + - "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + - "the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes." + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." ) dependencies.foreach(_.validate()) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..37ba524b48 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -389,7 +389,7 @@ object StreamingContext { // Set the default cleaner delay to an hour if not already set. // This should be sufficient for even 1 second interval. if (MetadataCleaner.getDelaySeconds < 0) { - MetadataCleaner.setDelaySeconds(60) + MetadataCleaner.setDelaySeconds(3600) } new SparkContext(master, frameworkName) } -- cgit v1.2.3 From 7eea64aa4c0d6a51406e0d1b039906ee9559cd58 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 5 Feb 2013 11:41:31 -0800 Subject: Streaming constructor which takes JavaSparkContext It's sometimes helpful to directly pass a JavaSparkContext, and take advantage of the various constructors available for that. --- .../scala/spark/streaming/api/java/JavaStreamingContext.scala | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index f82e6a37cc..e7f446a49b 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -33,6 +33,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { def this(master: String, frameworkName: String, batchDuration: Duration) = this(new StreamingContext(master, frameworkName, batchDuration)) + /** + * Creates a StreamingContext. + * @param sparkContext The underlying JavaSparkContext to use + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: JavaSparkContext, batchDuration: Duration) = + this(new StreamingContext(sparkContext.sc, batchDuration)) + /** * Re-creates a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or -- cgit v1.2.3 From d55e3aa467ab7d406739255bd8dc3dfc60f3cb16 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 7 Feb 2013 13:59:18 -0800 Subject: Updated JavaStreamingContext with updated kafkaStream API. --- .../streaming/api/java/JavaStreamingContext.scala | 26 ++++++++-------------- 1 file changed, 9 insertions(+), 17 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index f82e6a37cc..70d6bd2b1b 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -45,27 +45,24 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. - * @param hostname Zookeper hostname. - * @param port Zookeper port. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. */ def kafkaStream[T]( - hostname: String, - port: Int, + zkQuorum: String, groupId: String, topics: JMap[String, JInt]) : JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) } /** * Create an input stream that pulls messages form a Kafka Broker. - * @param hostname Zookeper hostname. - * @param port Zookeper port. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -73,8 +70,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * By default the value is pulled from zookeper. */ def kafkaStream[T]( - hostname: String, - port: Int, + zkQuorum: String, groupId: String, topics: JMap[String, JInt], initialOffsets: JMap[KafkaPartitionKey, JLong]) @@ -82,8 +78,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] ssc.kafkaStream[T]( - hostname, - port, + zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) @@ -91,8 +86,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. - * @param hostname Zookeper hostname. - * @param port Zookeper port. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -101,8 +95,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel RDD storage level. Defaults to memory-only */ def kafkaStream[T]( - hostname: String, - port: Int, + zkQuorum: String, groupId: String, topics: JMap[String, JInt], initialOffsets: JMap[KafkaPartitionKey, JLong], @@ -111,8 +104,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] ssc.kafkaStream[T]( - hostname, - port, + zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), -- cgit v1.2.3