diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 15:05:20 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 15:05:20 -0800 |
commit | bcee3cb2db2f213efdc142d83419dfd9e35bc4fe (patch) | |
tree | 0aa12b24f0074f3c6522a8ab3c16bf8ac9097061 /streaming/src/test | |
parent | 12300758ccbe7d45c6149d56772133ae1bc5cb25 (diff) | |
parent | 4cc223b4785c9da39c4a35d2adb7339dfa8e47e6 (diff) | |
download | spark-bcee3cb2db2f213efdc142d83419dfd9e35bc4fe.tar.gz spark-bcee3cb2db2f213efdc142d83419dfd9e35bc4fe.tar.bz2 spark-bcee3cb2db2f213efdc142d83419dfd9e35bc4fe.zip |
Merge pull request #455 from tdas/streaming
Merging latest master branch changes to the streaming branch
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java) | 25 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala) | 0 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 2 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/CheckpointSuite.scala | 129 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/FailureSuite.scala | 2 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 134 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala | 2 |
7 files changed, 142 insertions, 152 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index c84e7331c7..fbe4af4597 100644 --- a/streaming/src/test/java/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -43,9 +43,9 @@ public class JavaAPISuite implements Serializable { ssc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); + System.clearProperty("spark.driver.port"); } - + /* @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -434,7 +434,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - + */ /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. @@ -450,7 +450,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } - + /* // PairDStream Functions @Test public void testPairFilter() { @@ -897,7 +897,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } - + */ @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -911,7 +911,6 @@ public class JavaAPISuite implements Serializable { Arrays.asList(1,4), Arrays.asList(8,7)); - File tempDir = Files.createTempDir(); ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); @@ -927,14 +926,16 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expectedInitial, initialResult); Thread.sleep(1000); - ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - ssc.start(); - List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); - assertOrderInvariantEquals(expectedFinal, finalResult); + // Tweak to take into consideration that the last batch before failure + // will be re-processed after recovery + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); + assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); } + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @Test public void testCheckpointofIndividualStream() throws InterruptedException { @@ -963,7 +964,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result1); } */ - + /* // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @@ -1025,5 +1026,5 @@ public class JavaAPISuite implements Serializable { public void testFileStream() { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); - } + }*/ } diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..56349837e5 100644 --- a/streaming/src/test/java/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index d98b840b8e..c031949dd1 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -10,7 +10,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index d2f32c189b..7126af62d9 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -7,6 +7,8 @@ import org.scalatest.BeforeAndAfter import org.apache.commons.io.FileUtils import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.{Clock, ManualClock} +import scala.util.Random +import com.google.common.io.Files class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { @@ -19,7 +21,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } var ssc: StreamingContext = null @@ -32,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def actuallyWait = true - test("basic stream+rdd recovery") { + test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") @@ -63,9 +65,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // then check whether some RDD has been checkpointed or not ssc.start() runStreamsWithRealDelay(ssc, firstNumBatches) - logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.rdds.foreach { + logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -74,7 +76,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) + val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -91,8 +93,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // is present in the checkpoint data or not ssc.start() runStreamsWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.rdds.foreach { + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), @@ -117,7 +119,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc = null } - test("map and reduceByKey") { + // This tests whether the systm can recover from a master failure with simple + // non-stateful operations. This assumes as reliable, replayable input + // source - TestInputDStream. + test("recovery with map and reduceByKey operations") { testCheckpointedOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ), (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), @@ -126,7 +131,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) } - test("reduceByKeyAndWindowInv") { + + // This tests whether the ReduceWindowedDStream's RDD checkpoints works correctly such + // that the system can recover from a master failure. This assumes as reliable, + // replayable input source - TestInputDStream. + test("recovery with invertible reduceByKeyAndWindow operation") { val n = 10 val w = 4 val input = (1 to n).map(_ => Seq("a")).toSeq @@ -139,7 +148,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { testCheckpointedOperation(input, operation, output, 7) } - test("updateStateByKey") { + + // This tests whether the StateDStream's RDD checkpoints works correctly such + // that the system can recover from a master failure. This assumes as reliable, + // replayable input source - TestInputDStream. + test("recovery with updateStateByKey operation") { val input = (1 to 10).map(_ => Seq("a")).toSeq val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { @@ -154,11 +167,99 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { testCheckpointedOperation(input, operation, output, 7) } + // This tests whether file input stream remembers what files were seen before + // the master failure and uses them again to process a large window operatoin. + // It also tests whether batches, whose processing was incomplete due to the + // failure, are re-processed or not. + test("recovery with file input stream") { + // Set up the streaming context and input streams + val testDir = Files.createTempDir() + var ssc = new StreamingContext(master, framework, batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val fileStream = ssc.textFileStream(testDir.toString) + // Making value 3 take large time to process, to ensure that the master + // shuts down in the middle of processing the 3rd batch + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) Thread.sleep(1000) + i + }) + // Reducing over a large window to ensure that recovery from master failure + // requires reprocessing of all the files seen before the failure + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputBuffer = new ArrayBuffer[Seq[Int]] + var outputStream = new TestOutputStream(reducedStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + // wait to make sure that the file is written such that it gets shown in the file listings + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + // wait to make sure that FileInputDStream picks up this file only and not any other file + Thread.sleep(500) + } + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0, "No files processed before restart") + ssc.stop() + + // Create files while the master is down + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(1000) + } + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(7, 8, 9)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) + } + Thread.sleep(1000) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0, "No files processed after restart") + ssc.stop() + + // Append the new output to the old buffer + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + outputBuffer ++= outputStream.output + + // Verify whether data received by Spark Streaming was as expected + val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } + } + + /** - * Tests a streaming operation under checkpointing, by restart the operation + * Tests a streaming operation under checkpointing, by restarting the operation * from checkpoint file and verifying whether the final output is correct. * The output is assumed to have come from a reliable queue which an replay * data as required. + * + * NOTE: This takes into consideration that the last batch processed before + * master failure will be re-processed after restart/recovery. */ def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], @@ -172,7 +273,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val totalNumBatches = input.size val nextNumBatches = totalNumBatches - initialNumBatches val initialNumExpectedOutputs = initialNumBatches - val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1 + // because the last batch will be processed again // Do the computation for initial number of batches, create checkpoint file and quit ssc = setupStreams[U, V](input, operation) @@ -188,6 +290,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) + // the first element will be re-processed data of the last batch before restart verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) ssc = null } diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 7493ac1207..c4cfffbfc1 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -24,7 +24,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } override def framework = "CheckpointSuite" diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..c442210004 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -19,35 +19,24 @@ import org.apache.avro.ipc.specific.SpecificRequestor import java.nio.ByteBuffer import collection.JavaConversions._ import java.nio.charset.Charset +import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val testPort = 9999 - var testServer: TestServer = null - var testDir: File = null - override def checkpointDir = "checkpoint" after { - FileUtils.deleteDirectory(new File(checkpointDir)) - if (testServer != null) { - testServer.stop() - testServer = null - } - if (testDir != null && testDir.exists()) { - FileUtils.deleteDirectory(testDir) - testDir = null - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } + test("network input stream") { // Start the server - testServer = new TestServer(testPort) + val testPort = 9999 + val testServer = new TestServer(testPort) testServer.start() // Set up the streaming context and input streams @@ -93,46 +82,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("network input stream with checkpoint") { - // Start the server - testServer = new TestServer(testPort) - testServer.start() - - // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework, batchDuration) - ssc.checkpoint(checkpointDir, checkpointInterval) - val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) - ssc.registerOutputStream(outputStream) - ssc.start() - - // Feed data to the server to send to the network receiver - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - for (i <- Seq(1, 2, 3)) { - testServer.send(i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - assert(outputStream.output.size > 0) - ssc.stop() - - // Restart stream computation from checkpoint and feed more data to see whether - // they are being received and processed - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - for (i <- Seq(4, 5, 6)) { - testServer.send(i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - assert(outputStream.output.size > 0) - ssc.stop() - } test("flume input stream") { // Set up the streaming context and input streams @@ -182,18 +131,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("file input stream") { - - // Create a temporary directory - testDir = { - var temp = File.createTempFile(".temp.", Random.nextInt().toString) - temp.delete() - temp.mkdirs() - logInfo("Created temp dir " + temp) - temp - } + test("file input stream") { // Set up the streaming context and input streams + val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] @@ -214,10 +155,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { //Thread.sleep(100) } val startTime = System.currentTimeMillis() - /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) - Thread.sleep(100) - }*/ Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") @@ -226,11 +163,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) - logInfo("output") + logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") + logInfo("expected output, size = " + expectedOutput.size) expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") @@ -241,56 +176,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i).size === 1) assert(output(i).head.toString === expectedOutput(i)) } - } - - test("file input stream with checkpoint") { - // Create a temporary directory - testDir = { - var temp = File.createTempFile(".temp.", Random.nextInt().toString) - temp.delete() - temp.mkdirs() - logInfo("Created temp dir " + temp) - temp - } - - // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework, batchDuration) - ssc.checkpoint(checkpointDir, checkpointInterval) - val filestream = ssc.textFileStream(testDir.toString) - var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) - ssc.registerOutputStream(outputStream) - ssc.start() - - // Create files and advance manual clock to process them - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(1000) - for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) - ssc.stop() - - // Restart stream computation from checkpoint and create more files to see whether - // they are being processed - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(500) - for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) - clock.addToTime(batchDuration.milliseconds) - } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) - ssc.stop() + FileUtils.deleteDirectory(testDir) } } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 0c6e928835..cd9608df53 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -13,7 +13,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } val largerSlideInput = Seq( |