diff options
author | Xin Ren <iamshrek@126.com> | 2016-05-10 15:12:47 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-10 15:12:47 -0700 |
commit | 86475520f88f90c9d3b71516f65ccc0e9d244863 (patch) | |
tree | 365a7f00639c95b46c330ab30215c24f41ac1796 /external/flume/src/main | |
parent | da02d006bbb5c4fe62abd5542b9fff7d1c58603c (diff) | |
download | spark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.gz spark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.bz2 spark-86475520f88f90c9d3b71516f65ccc0e9d244863.zip |
[SPARK-14936][BUILD][TESTS] FlumePollingStreamSuite is slow
https://issues.apache.org/jira/browse/SPARK-14936
## What changes were proposed in this pull request?
FlumePollingStreamSuite contains two tests which run for a minute each. This seems excessively slow and we should speed it up if possible.
In this PR, instead of creating `StreamingContext` directly from `conf`, here an underlying `SparkContext` is created before all and it is used to create each`StreamingContext`.
Running time is reduced by avoiding multiple `SparkContext` creations and destroys.
## How was this patch tested?
Tested on my local machine running `testOnly *.FlumePollingStreamSuite`
Author: Xin Ren <iamshrek@126.com>
Closes #12845 from keypointt/SPARK-14936.
Diffstat (limited to 'external/flume/src/main')
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 6a4dafb8ed..15ff4f6025 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -116,7 +116,7 @@ private[flume] class PollingFlumeTestUtils { /** * Send data and wait until all data has been received */ - def sendDatAndEnsureAllDataHasBeenReceived(): Unit = { + def sendDataAndEnsureAllDataHasBeenReceived(): Unit = { val executor = Executors.newCachedThreadPool() val executorCompletion = new ExecutorCompletionService[Void](executor) @@ -174,7 +174,7 @@ private[flume] class PollingFlumeTestUtils { val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") queueRemaining.setAccessible(true) val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") - if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) { + if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != channelCapacity) { throw new AssertionError(s"Channel ${channel.getName} is not empty") } } |