From b40907310266be1be5db5f773bc9bcbf2813c090 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 1 Mar 2013 15:05:07 -0800 Subject: Instead of failing to bind to a fixed, already-in-use port, let the OS choose an available port for TestServer. --- .../test/scala/spark/streaming/InputStreamsSuite.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index ebcb6d0092..4d33857b25 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -29,7 +29,7 @@ import java.nio.charset.Charset import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") val testPort = 9999 @@ -44,12 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("socket input stream") { // Start the server - val testServer = new TestServer(testPort) + val testServer = new TestServer() testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) @@ -193,8 +193,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("actor input stream") { // Start the server - val port = testPort - val testServer = new TestServer(port) + val testServer = new TestServer() + val port = testServer.port testServer.start() // Set up the streaming context and input streams @@ -244,11 +244,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is server to test the network input stream */ -class TestServer(port: Int) extends Logging { +class TestServer() extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(port) + val serverSocket = new ServerSocket(0) val servingThread = new Thread() { override def run() { @@ -290,11 +290,13 @@ class TestServer(port: Int) extends Logging { def send(msg: String) { queue.add(msg) } def stop() { servingThread.interrupt() } + + def port = serverSocket.getLocalPort } object TestServer { def main(args: Array[String]) { - val s = new TestServer(9999) + val s = new TestServer() s.start() while(true) { Thread.sleep(1000) -- cgit v1.2.3