aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-03-01 15:05:07 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-03-01 15:05:07 -0800
commitb40907310266be1be5db5f773bc9bcbf2813c090 (patch)
tree448926c6c38021ebc47c18d1ca6e1c9d5ad70bfc /streaming
parent25c71d3e12b7cd9fb6f756ae567de3a6696743ca (diff)
downloadspark-b40907310266be1be5db5f773bc9bcbf2813c090.tar.gz
spark-b40907310266be1be5db5f773bc9bcbf2813c090.tar.bz2
spark-b40907310266be1be5db5f773bc9bcbf2813c090.zip
Instead of failing to bind to a fixed, already-in-use port, let the OS choose an available port for TestServer.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ebcb6d0092..4d33857b25 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
-
+
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val testPort = 9999
@@ -44,12 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("socket input stream") {
// Start the server
- val testServer = new TestServer(testPort)
+ val testServer = new TestServer()
testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
@@ -193,8 +193,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("actor input stream") {
// Start the server
- val port = testPort
- val testServer = new TestServer(port)
+ val testServer = new TestServer()
+ val port = testServer.port
testServer.start()
// Set up the streaming context and input streams
@@ -244,11 +244,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is server to test the network input stream */
-class TestServer(port: Int) extends Logging {
+class TestServer() extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(port)
+ val serverSocket = new ServerSocket(0)
val servingThread = new Thread() {
override def run() {
@@ -290,11 +290,13 @@ class TestServer(port: Int) extends Logging {
def send(msg: String) { queue.add(msg) }
def stop() { servingThread.interrupt() }
+
+ def port = serverSocket.getLocalPort
}
object TestServer {
def main(args: Array[String]) {
- val s = new TestServer(9999)
+ val s = new TestServer()
s.start()
while(true) {
Thread.sleep(1000)