aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHari Shreedharan <harishreedharan@gmail.com>2014-08-17 19:50:31 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-17 19:50:31 -0700
commit95470a03ae85d7d37d75f73435425a0e22918bc9 (patch)
treeb1517d49e86548e518b8b2528d3fe5b9e7a68406 /external
parent99243288b049f4a4fb4ba0505ea2310be5eb4bd2 (diff)
downloadspark-95470a03ae85d7d37d75f73435425a0e22918bc9.tar.gz
spark-95470a03ae85d7d37d75f73435425a0e22918bc9.tar.bz2
spark-95470a03ae85d7d37d75f73435425a0e22918bc9.zip
[HOTFIX][STREAMING] Allow the JVM/Netty to decide which port to bind to in Flume Polling Tests.
Author: Hari Shreedharan <harishreedharan@gmail.com> Closes #1820 from harishreedharan/use-free-ports and squashes the following commits: b939067 [Hari Shreedharan] Remove unused import. 67856a8 [Hari Shreedharan] Remove findFreePort. 0ea51d1 [Hari Shreedharan] Make some changes to getPort to use map on the serverOpt. 1fb0283 [Hari Shreedharan] Merge branch 'master' of https://github.com/apache/spark into use-free-ports b351651 [Hari Shreedharan] Allow Netty to choose port, and query it to decide the port to bind to. Leaving findFreePort as is, if other tests want to use it at some point. e6c9620 [Hari Shreedharan] Making sure the second sink uses the correct port. 11c340d [Hari Shreedharan] Add info about race condition to scaladoc. e89d135 [Hari Shreedharan] Adding Scaladoc. 6013bb0 [Hari Shreedharan] [STREAMING] Find free ports to use before attempting to create Flume Sink in Flume Polling Suite
Diffstat (limited to 'external')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala8
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala55
2 files changed, 34 insertions, 29 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 7b735133e3..948af5947f 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -131,6 +131,14 @@ class SparkSink extends AbstractSink with Logging with Configurable {
blockingLatch.await()
Status.BACKOFF
}
+
+ private[flume] def getPort(): Int = {
+ serverOpt
+ .map(_.getPort)
+ .getOrElse(
+ throw new RuntimeException("Server was not started!")
+ )
+ }
}
/**
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index a69baa1698..8a85b0f987 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.Random
+import org.apache.spark.TestUtils
+
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -39,9 +41,6 @@ import org.apache.spark.util.Utils
class FlumePollingStreamSuite extends TestSuiteBase {
- val random = new Random()
- /** Return a port in the ephemeral range. */
- def getTestPort = random.nextInt(16382) + 49152
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
@@ -77,17 +76,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
private def testFlumePolling(): Unit = {
- val testPort = getTestPort
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
- StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- outputStream.register()
-
// Start the channel and sink.
val context = new Context()
context.put("capacity", channelCapacity.toString)
@@ -98,10 +86,19 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val sink = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
+ StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
@@ -111,18 +108,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
private def testFlumePollingMultipleHost(): Unit = {
- val testPort = getTestPort
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- eventsPerBatch, 5)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- outputStream.register()
-
// Start the channel and sink.
val context = new Context()
context.put("capacity", channelCapacity.toString)
@@ -136,17 +121,29 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val sink = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
val sink2 = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+ eventsPerBatch, 5)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel)