diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
commit | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch) | |
tree | 3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /core/src/test/scala/spark/LocalSparkContext.scala | |
parent | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff) | |
parent | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff) | |
download | spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2 spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip |
Merge branch 'streaming' into ScrapCode-streaming
Conflicts:
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'core/src/test/scala/spark/LocalSparkContext.scala')
-rw-r--r-- | core/src/test/scala/spark/LocalSparkContext.scala | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala new file mode 100644 index 0000000000..ff00dd05dd --- /dev/null +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -0,0 +1,41 @@ +package spark + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterEach + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => + + @transient var sc: SparkContext = _ + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() = { + if (sc != null) { + LocalSparkContext.stop(sc) + sc = null + } + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + sc.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { + try { + f(sc) + } finally { + stop(sc) + } + } + +}
\ No newline at end of file |