aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2016-05-10 15:12:47 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-10 15:12:47 -0700
commit86475520f88f90c9d3b71516f65ccc0e9d244863 (patch)
tree365a7f00639c95b46c330ab30215c24f41ac1796
parentda02d006bbb5c4fe62abd5542b9fff7d1c58603c (diff)
downloadspark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.gz
spark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.bz2
spark-86475520f88f90c9d3b71516f65ccc0e9d244863.zip
[SPARK-14936][BUILD][TESTS] FlumePollingStreamSuite is slow
https://issues.apache.org/jira/browse/SPARK-14936 ## What changes were proposed in this pull request? FlumePollingStreamSuite contains two tests which run for a minute each. This seems excessively slow and we should speed it up if possible. In this PR, instead of creating `StreamingContext` directly from `conf`, here an underlying `SparkContext` is created before all and it is used to create each`StreamingContext`. Running time is reduced by avoiding multiple `SparkContext` creations and destroys. ## How was this patch tested? Tested on my local machine running `testOnly *.FlumePollingStreamSuite` Author: Xin Ren <iamshrek@126.com> Closes #12845 from keypointt/SPARK-14936.
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala4
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala26
-rw-r--r--python/pyspark/streaming/tests.py2
3 files changed, 23 insertions, 9 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 6a4dafb8ed..15ff4f6025 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -116,7 +116,7 @@ private[flume] class PollingFlumeTestUtils {
/**
* Send data and wait until all data has been received
*/
- def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ def sendDataAndEnsureAllDataHasBeenReceived(): Unit = {
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
@@ -174,7 +174,7 @@ private[flume] class PollingFlumeTestUtils {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != channelCapacity) {
throw new AssertionError(s"Channel ${channel.getName} is not empty")
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 156712483d..1c93079497 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
-import org.scalatest.BeforeAndAfter
+import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.storage.StorageLevel
@@ -35,11 +35,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.util.{ManualClock, Utils}
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
+class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
val maxAttempts = 5
val batchDuration = Seconds(1)
+ @transient private var _sc: SparkContext = _
+
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
@@ -47,6 +49,17 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val utils = new PollingFlumeTestUtils
+ override def beforeAll(): Unit = {
+ _sc = new SparkContext(conf)
+ }
+
+ override def afterAll(): Unit = {
+ if (_sc != null) {
+ _sc.stop()
+ _sc = null
+ }
+ }
+
test("flume polling test") {
testMultipleTimes(testFlumePolling)
}
@@ -98,7 +111,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
// Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
+ val ssc = new StreamingContext(_sc, batchDuration)
val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
@@ -109,7 +122,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
ssc.start()
try {
- utils.sendDatAndEnsureAllDataHasBeenReceived()
+ utils.sendDataAndEnsureAllDataHasBeenReceived()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
clock.advance(batchDuration.milliseconds)
@@ -123,7 +136,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
- ssc.stop()
+ // here stop ssc only, but not underlying sparkcontext
+ ssc.stop(false)
}
}
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 148bf7e8ff..f27628c895 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1357,7 +1357,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
dstream.foreachRDD(get_output)
ssc.start()
- self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+ self._utils.sendDataAndEnsureAllDataHasBeenReceived()
self.wait_for(outputBuffer, self._utils.getTotalEvents())
outputHeaders = [event[0] for event in outputBuffer]