diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-06-24 23:57:47 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-06-24 23:57:47 -0700 |
commit | c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 (patch) | |
tree | 01f9f42f30a4aa2f73cb200c89a71254bf74d80e /streaming | |
parent | 48c7e373c62b2e8cf48157ceb0d92c38c3a40652 (diff) | |
parent | 78ffe164b33c6b11a2e511442605acd2f795a1b5 (diff) | |
download | spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.gz spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.bz2 spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.zip |
Merge branch 'master' into streaming
Conflicts:
.gitignore
Diffstat (limited to 'streaming')
14 files changed, 202 insertions, 75 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml index 15523eadcb..4dc9a19d51 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -3,8 +3,8 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.spark-project</groupId> - <artifactId>parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <artifactId>spark-parent</artifactId> + <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -41,6 +41,12 @@ <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.2.0</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.github.sgroschupf</groupId> @@ -149,5 +155,42 @@ </plugins> </build> </profile> + <profile> + <id>hadoop2-yarn</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2-yarn</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2-yarn</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e7a392fbbf..66e67cbfa1 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val framework = ssc.sc.appName val sparkHome = ssc.sc.sparkHome val jars = ssc.sc.jars + val environment = ssc.sc.environment val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration @@ -37,11 +38,20 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 @@ -50,15 +60,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging { attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -83,7 +95,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + synchronized { + if (stopped) return ; + stopped = true + } executor.shutdown() + val startTime = System.currentTimeMillis() + val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) + val endTime = System.currentTimeMillis() + logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index adb7f3a24d..3b331956f5 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + batchDuration = duration } - batchDuration = duration } def remember(duration: Duration) { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 03d2907323..f2c4073f22 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -6,7 +6,7 @@ import akka.zeromq.Subscribe import spark.streaming.dstream._ -import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark._ import spark.streaming.receivers.ActorReceiver import spark.streaming.receivers.ReceiverSupervisorStrategy import spark.streaming.receivers.ZeroMQReceiver @@ -14,18 +14,18 @@ import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver - import scala.collection.mutable.Queue +import scala.collection.Map import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger +import java.util.UUID import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -import java.util.UUID import twitter4j.Status @@ -45,7 +45,9 @@ class StreamingContext private ( * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) + def this(sparkContext: SparkContext, batchDuration: Duration) = { + this(sparkContext, null, batchDuration) + } /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. @@ -53,8 +55,17 @@ class StreamingContext private ( * @param appName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(master: String, appName: String, batchDuration: Duration) = - this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration) + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String = null, + jars: Seq[String] = Nil, + environment: Map[String, String] = Map()) = { + this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), + null, batchDuration) + } + /** * Re-create a StreamingContext from a checkpoint file. @@ -66,15 +77,20 @@ class StreamingContext private ( initLogging() if (sc_ == null && cp_ == null) { - throw new Exception("Streaming Context cannot be initilalized with " + + throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") } + if (MetadataCleaner.getDelaySeconds < 0) { + throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") + } + protected[streaming] val isCheckpointPresent = (cp_ != null) protected[streaming] val sc: SparkContext = { if (isCheckpointPresent) { - new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars) + new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment) } else { sc_ } @@ -497,14 +513,18 @@ object StreamingContext { new PairDStreamFunctions[K, V](stream) } - protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = { - + protected[streaming] def createNewSparkContext( + master: String, + appName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String]): SparkContext = { // Set the default cleaner delay to an hour if not already set. // This should be sufficient for even 1 second interval. if (MetadataCleaner.getDelaySeconds < 0) { MetadataCleaner.setDelaySeconds(3600) } - new SparkContext(master, appName) + new SparkContext(master, appName, sparkHome, jars, environment) } protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index fd5e06b50f..4259d4891c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -41,10 +41,63 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(master: String, appName: String, batchDuration: Duration) = - this(new StreamingContext(master, appName, batchDuration)) + this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) /** * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local + * file system or an HDFS, HTTP, HTTPS, or FTP URL. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jarFile: String) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map())) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map())) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String], + environment: JMap[String, String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment)) + + /** + * Creates a StreamingContext using an existing SparkContext. * @param sparkContext The underlying JavaSparkContext to use * @param batchDuration The time interval at which streaming data will be divided into batches */ diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 7bd53fb6dd..55d2957be4 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -15,6 +15,7 @@ import kafka.utils.ZkUtils._ import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ +import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala index 6b310bc0b6..da224ad6f7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -28,7 +28,7 @@ class QueueInputDStream[T: ClassManifest]( } if (buffer.size > 0) { if (oneAtATime) { - Some(buffer.first) + Some(buffer.head) } else { Some(new UnionRDD(ssc.sc, buffer.toSeq)) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 4af839ad7f..1408af0afa 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -2,6 +2,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext import spark.storage.StorageLevel +import spark.util.NextIterator import java.io._ import java.net.Socket @@ -59,45 +60,18 @@ object SocketReceiver { */ def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } + new NextIterator[String] { + protected override def getNext() = { + val nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true } - !finished + nextValue } - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue + protected override def close() { + dataInputStream.close() } } - iterator } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index bdd9f4d753..426a9b6f71 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -159,7 +159,8 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration) + System.clearProperty("spark.hostPort") + var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) val operatedStream = operation(inputStream) @@ -205,6 +206,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -357,13 +359,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala index d8b987ec86..bd0b0e74c1 100644 --- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala @@ -5,7 +5,7 @@ import spark.util.{RateLimitedOutputStream, IntParam} import java.net.ServerSocket import spark.{Logging, KryoSerializer} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import io.Source +import scala.io.Source import java.io.IOException /** diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 168e1b7a55..565089a853 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,13 +6,16 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - override def framework() = "BasicOperationsSuite" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -}
\ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 595c766a21..b024fc9dcc 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -29,27 +29,30 @@ import java.nio.charset.Charset import com.google.common.io.Files class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") val testPort = 9999 override def checkpointDir = "checkpoint" + before { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("socket input stream") { // Start the server - val testServer = new TestServer(testPort) + val testServer = new TestServer() testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) @@ -94,7 +97,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -104,7 +107,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver); @@ -193,8 +196,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("actor input stream") { // Start the server - val port = testPort - val testServer = new TestServer(port) + val testServer = new TestServer() + val port = testServer.port testServer.start() // Set up the streaming context and input streams @@ -255,11 +258,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is server to test the network input stream */ -class TestServer(port: Int) extends Logging { +class TestServer() extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(port) + val serverSocket = new ServerSocket(0) val servingThread = new Thread() { override def run() { @@ -301,11 +304,13 @@ class TestServer(port: Int) extends Logging { def send(msg: String) { queue.add(msg) } def stop() { servingThread.interrupt() } + + def port = serverSocket.getLocalPort } object TestServer { def main(args: Array[String]) { - val s = new TestServer(9999) + val s = new TestServer() s.start() while(true) { Thread.sleep(1000) diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( |