aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-19 14:18:10 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-19 14:18:10 -0800
commit1c938413ba5579034675f1b4ea3b8fd0e47dd8d6 (patch)
treefc551db455e7e07b292a1dfdba5ecb16b75473af /external/flume
parent0df02ca463a4126e5437b37114c6759a57ab71ee (diff)
downloadspark-1c938413ba5579034675f1b4ea3b8fd0e47dd8d6.tar.gz
spark-1c938413ba5579034675f1b4ea3b8fd0e47dd8d6.tar.bz2
spark-1c938413ba5579034675f1b4ea3b8fd0e47dd8d6.zip
SPARK-3962 Marked scope as provided for external projects.
Somehow maven shade plugin is set in infinite loop of creating effective pom. Author: Prashant Sharma <prashant.s@imaginea.com> Author: Prashant Sharma <scrapcodes@gmail.com> Closes #2959 from ScrapCodes/SPARK-3962/scope-provided and squashes the following commits: 994d1d3 [Prashant Sharma] Fixed failing flume tests 270b4fb [Prashant Sharma] Removed most of the unused code. bb3bbfd [Prashant Sharma] SPARK-3962 Marked scope as provided for external.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml8
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java40
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala23
4 files changed, 107 insertions, 12 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index fae84a31b8..a682f0e847 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -39,6 +39,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -46,13 +47,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>${flume.version}</version>
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000000..6e1f019000
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
new file mode 100644
index 0000000000..1a900007b6
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.{IOException, ObjectInputStream}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
+ */
+class TestOutputStream[T: ClassTag](parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ }) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 475026e8eb..b57a1c71e3 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,9 +20,6 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
-import java.util.Random
-
-import org.apache.spark.TestUtils
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -32,20 +29,35 @@ import org.apache.flume.channel.MemoryChannel
import org.apache.flume.conf.Configurables
import org.apache.flume.event.EventBuilder
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
import org.apache.spark.util.Utils
-class FlumePollingStreamSuite extends TestSuiteBase {
+class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
val maxAttempts = 5
+ val batchDuration = Seconds(1)
+
+ val conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName(this.getClass.getSimpleName)
+
+ def beforeFunction() {
+ logInfo("Using manual clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ }
+
+ before(beforeFunction())
test("flume polling test") {
testMultipleTimes(testFlumePolling)
@@ -229,4 +241,5 @@ class FlumePollingStreamSuite extends TestSuiteBase {
null
}
}
+
}