aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/main
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2016-05-10 15:12:47 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-10 15:12:47 -0700
commit86475520f88f90c9d3b71516f65ccc0e9d244863 (patch)
tree365a7f00639c95b46c330ab30215c24f41ac1796 /external/flume/src/main
parentda02d006bbb5c4fe62abd5542b9fff7d1c58603c (diff)
downloadspark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.gz
spark-86475520f88f90c9d3b71516f65ccc0e9d244863.tar.bz2
spark-86475520f88f90c9d3b71516f65ccc0e9d244863.zip
[SPARK-14936][BUILD][TESTS] FlumePollingStreamSuite is slow
https://issues.apache.org/jira/browse/SPARK-14936 ## What changes were proposed in this pull request? FlumePollingStreamSuite contains two tests which run for a minute each. This seems excessively slow and we should speed it up if possible. In this PR, instead of creating `StreamingContext` directly from `conf`, here an underlying `SparkContext` is created before all and it is used to create each`StreamingContext`. Running time is reduced by avoiding multiple `SparkContext` creations and destroys. ## How was this patch tested? Tested on my local machine running `testOnly *.FlumePollingStreamSuite` Author: Xin Ren <iamshrek@126.com> Closes #12845 from keypointt/SPARK-14936.
Diffstat (limited to 'external/flume/src/main')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 6a4dafb8ed..15ff4f6025 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -116,7 +116,7 @@ private[flume] class PollingFlumeTestUtils {
/**
* Send data and wait until all data has been received
*/
- def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ def sendDataAndEnsureAllDataHasBeenReceived(): Unit = {
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
@@ -174,7 +174,7 @@ private[flume] class PollingFlumeTestUtils {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != channelCapacity) {
throw new AssertionError(s"Channel ${channel.getName} is not empty")
}
}