diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
commit | 4106ae9fbf8a582697deba2198b3b966dec00bfe (patch) | |
tree | 7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /streaming/src/test | |
parent | e0dd24dc858777904335218f3001a24bffe73b27 (diff) | |
parent | 5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff) | |
download | spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2 spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip |
Merged with master
Diffstat (limited to 'streaming/src/test')
10 files changed, 244 insertions, 87 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4cf10582a9..c0d729ff87 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1,4 +1,21 @@ -package spark.streaming; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -11,20 +28,20 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import scala.Tuple2; -import spark.HashPartitioner; -import spark.api.java.JavaPairRDD; -import spark.api.java.JavaRDD; -import spark.api.java.JavaRDDLike; -import spark.api.java.JavaPairRDD; -import spark.api.java.JavaSparkContext; -import spark.api.java.function.*; -import spark.storage.StorageLevel; -import spark.streaming.api.java.JavaDStream; -import spark.streaming.api.java.JavaPairDStream; -import spark.streaming.api.java.JavaStreamingContext; -import spark.streaming.JavaTestUtils; -import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.InputStreamsSuite; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.*; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.JavaTestUtils; +import org.apache.spark.streaming.JavaCheckpointTestUtils; +import org.apache.spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; @@ -42,7 +59,7 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 8a7c48bde6..d5cdad4998 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -1,19 +1,37 @@ -package spark.streaming +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag import java.util.{List => JList} -import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} -import spark.streaming._ +import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ +import org.apache.spark.api.java.JavaRDDLike /** Exposes streaming test functionality in a Java-friendly way. */ trait JavaTestBase extends TestSuiteBase { /** - * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context. * The stream will be derived from the supplied lists of Java objects. */ def attachTestInputStream[T]( @@ -31,11 +49,11 @@ trait JavaTestBase extends TestSuiteBase { /** * Attach a provided stream to it's associated StreamingContext as a - * [[spark.streaming.TestOutputStream]]. - */ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R], - R <: spark.api.java.JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = { + * [[org.apache.spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( + dstream: JavaDStreamLike[T, This, R]) = + { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStream(dstream.dstream, diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index 59c445e63f..063529a9cb 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,4 +1,21 @@ -# Set everything to be logged to the file streaming/target/unit-tests.log +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 565089a853..11586f72b6 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -1,6 +1,23 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.StreamingContext._ import scala.runtime.RichInt import util.ManualClock @@ -9,7 +26,7 @@ class BasicOperationsSuite extends TestSuiteBase { override def framework() = "BasicOperationsSuite" before { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } after { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 141842c380..b8337ed423 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -1,5 +1,24 @@ -package spark.streaming +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming +import dstream.FileInputDStream +import org.apache.spark.streaming.StreamingContext._ import java.io.File import scala.collection.mutable.ArrayBuffer @@ -23,7 +42,7 @@ import spark.streaming.util.ManualClock */ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") before { FileUtils.deleteDirectory(new File(checkpointDir)) @@ -50,7 +69,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala new file mode 100644 index 0000000000..6337c5359c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.Logging +import org.apache.spark.streaming.util.MasterFailureTest +import StreamingContext._ + +import org.scalatest.{FunSuite, BeforeAndAfter} +import com.google.common.io.Files +import java.io.File +import org.apache.commons.io.FileUtils +import collection.mutable.ArrayBuffer + + +/** + * This testsuite tests master failures at random times while the stream is running using + * the real clock. + */ +class FailureSuite extends FunSuite with BeforeAndAfter with Logging { + + var directory = "FailureSuite" + val numBatches = 30 + val batchDuration = Milliseconds(1000) + + before { + FileUtils.deleteDirectory(new File(directory)) + } + + after { + FileUtils.deleteDirectory(new File(directory)) + } + + test("multiple failures with map") { + MasterFailureTest.testMap(directory, numBatches, batchDuration) + } + + test("multiple failures with updateStateByKey") { + MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration) + } +} + diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b024fc9dcc..42e3e51e3f 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -1,4 +1,21 @@ -package spark.streaming +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming import akka.actor.Actor import akka.actor.IO @@ -12,9 +29,9 @@ import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock -import spark.storage.StorageLevel -import spark.streaming.receivers.Receiver -import spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receivers.Receiver +import org.apache.spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter @@ -35,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { override def checkpointDir = "checkpoint" before { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } after { @@ -190,7 +207,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(testDir) // Enable manual clock back again for other tests - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index dc280b09c9..c91f9ba46d 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -1,9 +1,24 @@ -package spark.streaming +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import spark.streaming.dstream.{InputDStream, ForEachDStream} -import spark.streaming.util.ManualClock +package org.apache.spark.streaming -import spark.{RDD, Logging} +import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.SynchronizedBuffer @@ -13,6 +28,9 @@ import java.io.{ObjectInputStream, IOException} import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + /** * This is a input stream just for the testsuites. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index 80d827706f..f50e05c0d8 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -1,11 +1,28 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.StreamingContext._ import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") override def framework = "WindowOperationsSuite" diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala deleted file mode 100644 index a5fa7ab92d..0000000000 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -package spark.streaming - -import spark.Logging -import spark.streaming.util.MasterFailureTest -import StreamingContext._ - -import org.scalatest.{FunSuite, BeforeAndAfter} -import com.google.common.io.Files -import java.io.File -import org.apache.commons.io.FileUtils -import collection.mutable.ArrayBuffer - - -/** - * This testsuite tests master failures at random times while the stream is running using - * the real clock. - */ -class FailureSuite extends FunSuite with BeforeAndAfter with Logging { - - var directory = "FailureSuite" - val numBatches = 30 - val batchDuration = Milliseconds(1000) - - before { - FileUtils.deleteDirectory(new File(directory)) - } - - after { - FileUtils.deleteDirectory(new File(directory)) - } - - test("multiple failures with map") { - MasterFailureTest.testMap(directory, numBatches, batchDuration) - } - - test("multiple failures with updateStateByKey") { - MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration) - } -} - |