diff options
Diffstat (limited to 'streaming/src/test')
4 files changed, 25 insertions, 14 deletions
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 168e1b7a55..565089a853 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,13 +6,16 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - override def framework() = "BasicOperationsSuite" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -}
\ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 595c766a21..b024fc9dcc 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -29,27 +29,30 @@ import java.nio.charset.Charset import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") val testPort = 9999 override def checkpointDir = "checkpoint" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("socket input stream") { // Start the server - val testServer = new TestServer(testPort) + val testServer = new TestServer() testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) @@ -94,7 +97,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -104,7 +107,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver); @@ -193,8 +196,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("actor input stream") { // Start the server - val port = testPort - val testServer = new TestServer(port) + val testServer = new TestServer() + val port = testServer.port testServer.start() // Set up the streaming context and input streams @@ -255,11 +258,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is server to test the network input stream */ -class TestServer(port: Int) extends Logging { +class TestServer() extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(port) + val serverSocket = new ServerSocket(0) val servingThread = new Thread() { override def run() { @@ -301,11 +304,13 @@ class TestServer(port: Int) extends Logging { def send(msg: String) { queue.add(msg) } def stop() { servingThread.interrupt() } + + def port = serverSocket.getLocalPort } object TestServer { def main(args: Array[String]) { - val s = new TestServer(9999) + val s = new TestServer() s.start() while(true) { Thread.sleep(1000) diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( |