aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
commitc89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 (patch)
tree01f9f42f30a4aa2f73cb200c89a71254bf74d80e /streaming
parent48c7e373c62b2e8cf48157ceb0d92c38c3a40652 (diff)
parent78ffe164b33c6b11a2e511442605acd2f795a1b5 (diff)
downloadspark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.gz
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.bz2
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.zip
Merge branch 'master' into streaming
Conflicts: .gitignore
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml47
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala42
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala55
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala44
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextSender.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala7
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala27
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala1
14 files changed, 202 insertions, 75 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 15523eadcb..4dc9a19d51 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -41,6 +41,12 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
@@ -149,5 +155,42 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e7a392fbbf..66e67cbfa1 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val framework = ssc.sc.appName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
+ val environment = ssc.sc.environment
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
@@ -37,11 +38,20 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
+ // The file to which we actually write - and then "move" to file.
+ private val writeFile = new Path(file.getParent, file.getName + ".next")
+ private val bakFile = new Path(file.getParent, file.getName + ".bk")
+
+ private var stopped = false
+
val conf = new Configuration()
var fs = file.getFileSystem(conf)
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
+ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
+ // I did not notice any errors - reintroduce it ?
+
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
@@ -50,15 +60,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
- }
- val fos = fs.create(file)
+ // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+ val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
- fos.close()
+ if (fs.exists(file) && fs.rename(file, bakFile)) {
+ logDebug("Moved existing checkpoint file to " + bakFile)
+ }
+ // paranoia
+ fs.delete(file, false)
+ fs.rename(writeFile, file)
+
val finishTime = System.currentTimeMillis();
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
@@ -83,7 +95,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
def stop() {
+ synchronized {
+ if (stopped) return ;
+ stopped = true
+ }
executor.shutdown()
+ val startTime = System.currentTimeMillis()
+ val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
+ val endTime = System.currentTimeMillis()
+ logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index adb7f3a24d..3b331956f5 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
+ batchDuration = duration
}
- batchDuration = duration
}
def remember(duration: Duration) {
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 03d2907323..f2c4073f22 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -6,7 +6,7 @@ import akka.zeromq.Subscribe
import spark.streaming.dstream._
-import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark._
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.streaming.receivers.ZeroMQReceiver
@@ -14,18 +14,18 @@ import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
-
import scala.collection.mutable.Queue
+import scala.collection.Map
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
+import java.util.UUID
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import java.util.UUID
import twitter4j.Status
@@ -45,7 +45,9 @@ class StreamingContext private (
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
+ def this(sparkContext: SparkContext, batchDuration: Duration) = {
+ this(sparkContext, null, batchDuration)
+ }
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
@@ -53,8 +55,17 @@ class StreamingContext private (
* @param appName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, appName: String, batchDuration: Duration) =
- this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()) = {
+ this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
+ null, batchDuration)
+ }
+
/**
* Re-create a StreamingContext from a checkpoint file.
@@ -66,15 +77,20 @@ class StreamingContext private (
initLogging()
if (sc_ == null && cp_ == null) {
- throw new Exception("Streaming Context cannot be initilalized with " +
+ throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
+ }
+
protected[streaming] val isCheckpointPresent = (cp_ != null)
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
+ new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
} else {
sc_
}
@@ -497,14 +513,18 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
- protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
-
+ protected[streaming] def createNewSparkContext(
+ master: String,
+ appName: String,
+ sparkHome: String,
+ jars: Seq[String],
+ environment: Map[String, String]): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
MetadataCleaner.setDelaySeconds(3600)
}
- new SparkContext(master, appName)
+ new SparkContext(master, appName, sparkHome, jars, environment)
}
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index fd5e06b50f..4259d4891c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -41,10 +41,63 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, appName: String, batchDuration: Duration) =
- this(new StreamingContext(master, appName, batchDuration))
+ this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
/**
* Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
+ * file system or an HDFS, HTTP, HTTPS, or FTP URL.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jarFile: String) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String],
+ environment: JMap[String, String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
* @param sparkContext The underlying JavaSparkContext to use
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 7bd53fb6dd..55d2957be4 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -15,6 +15,7 @@ import kafka.utils.ZkUtils._
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
+import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index 6b310bc0b6..da224ad6f7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -28,7 +28,7 @@ class QueueInputDStream[T: ClassManifest](
}
if (buffer.size > 0) {
if (oneAtATime) {
- Some(buffer.first)
+ Some(buffer.head)
} else {
Some(new UnionRDD(ssc.sc, buffer.toSeq))
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 4af839ad7f..1408af0afa 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
+import spark.util.NextIterator
import java.io._
import java.net.Socket
@@ -59,45 +60,18 @@ object SocketReceiver {
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
-
- val iterator = new Iterator[String] {
- var gotNext = false
- var finished = false
- var nextValue: String = null
-
- private def getNext() {
- try {
- nextValue = dataInputStream.readLine()
- if (nextValue == null) {
- finished = true
- }
- }
- gotNext = true
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- getNext()
- if (finished) {
- dataInputStream.close()
- }
- }
+ new NextIterator[String] {
+ protected override def getNext() = {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
}
- !finished
+ nextValue
}
- override def next(): String = {
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- if (!gotNext) {
- getNext()
- }
- gotNext = false
- nextValue
+ protected override def close() {
+ dataInputStream.close()
}
}
- iterator
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index bdd9f4d753..426a9b6f71 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -159,7 +159,8 @@ object MasterFailureTest extends Logging {
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
- var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
+ System.clearProperty("spark.hostPort")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
@@ -205,6 +206,7 @@ object MasterFailureTest extends Logging {
// (iii) Its not timed out yet
System.clearProperty("spark.streaming.clock")
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
ssc.start()
val startTime = System.currentTimeMillis()
while (!killed && !isLastOutputGenerated && !isTimedOut) {
@@ -357,13 +359,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
// Write the data to a local file and then move it to the target test directory
val localFile = new File(localTestDir, (i+1).toString)
val hadoopFile = new Path(testDir, (i+1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
while (!done && tries < maxTries) {
tries += 1
try {
- fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
done = true
} catch {
case ioe: IOException => {
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
index d8b987ec86..bd0b0e74c1 100644
--- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
@@ -5,7 +5,7 @@ import spark.util.{RateLimitedOutputStream, IntParam}
import java.net.ServerSocket
import spark.{Logging, KryoSerializer}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import io.Source
+import scala.io.Source
import java.io.IOException
/**
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index 168e1b7a55..565089a853 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,13 +6,16 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
override def framework() = "BasicOperationsSuite"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
}
test("map") {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index cac86deeaf..607dea77ec 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
}
var ssc: StreamingContext = null
@@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
)
ssc = new StreamingContext(checkpointDir)
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
ssc.start()
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
@@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
outputStream.output
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 595c766a21..b024fc9dcc 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -29,27 +29,30 @@ import java.nio.charset.Charset
import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val testPort = 9999
override def checkpointDir = "checkpoint"
+ before {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
}
test("socket input stream") {
// Start the server
- val testServer = new TestServer(testPort)
+ val testServer = new TestServer()
testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
@@ -94,7 +97,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK)
+ val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
@@ -104,7 +107,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
@@ -193,8 +196,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("actor input stream") {
// Start the server
- val port = testPort
- val testServer = new TestServer(port)
+ val testServer = new TestServer()
+ val port = testServer.port
testServer.start()
// Set up the streaming context and input streams
@@ -255,11 +258,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is server to test the network input stream */
-class TestServer(port: Int) extends Logging {
+class TestServer() extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(port)
+ val serverSocket = new ServerSocket(0)
val servingThread = new Thread() {
override def run() {
@@ -301,11 +304,13 @@ class TestServer(port: Int) extends Logging {
def send(msg: String) { queue.add(msg) }
def stop() { servingThread.interrupt() }
+
+ def port = serverSocket.getLocalPort
}
object TestServer {
def main(args: Array[String]) {
- val s = new TestServer(9999)
+ val s = new TestServer()
s.start()
while(true) {
Thread.sleep(1000)
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 1b66f3bda2..80d827706f 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
}
val largerSlideInput = Seq(