aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-07-08 22:58:50 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-07-08 22:58:50 -0700
commit638927b78e5b67e71d0c7360a2f36841d13dc107 (patch)
tree580363bcf55d78e29eea1011cde2174590e1960d
parent3c1317835e8100e3d8b2f0883ee66c81a2300652 (diff)
parent4af0d63cb14db902cbd1dbdeeb68f1fcec4b2e97 (diff)
downloadspark-638927b78e5b67e71d0c7360a2f36841d13dc107.tar.gz
spark-638927b78e5b67e71d0c7360a2f36841d13dc107.tar.bz2
spark-638927b78e5b67e71d0c7360a2f36841d13dc107.zip
Merge pull request #683 from shivaram/sbt-test-fix
Remove some stack traces from sbt test output
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala1
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala4
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala1
-rw-r--r--core/src/test/scala/spark/LocalSparkContext.scala11
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala9
6 files changed, 23 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 55bb61b0cc..939f26b6f4 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -45,6 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// Stop the workers before the master so they don't get upset that it disconnected
workerActorSystems.foreach(_.shutdown())
workerActorSystems.foreach(_.awaitTermination())
+
masterActorSystems.foreach(_.shutdown())
masterActorSystems.foreach(_.awaitTermination())
}
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 5e84b3a66a..31c3dd75fb 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -2,6 +2,9 @@ package spark
import java.io.File
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts
import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -27,6 +30,7 @@ class DriverSuite extends FunSuite with Timeouts {
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
+ Logger.getRootLogger().setLevel(Level.WARN)
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
sc.parallelize(1 to 100, 4).count()
}
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index f1a35bced3..9c24ca430d 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -85,7 +85,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
in.close()
_ * fileVal + _ * fileVal
}.collect
- println(result)
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index 76d5258b02..bd184222ed 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -2,12 +2,21 @@ package spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.BeforeAndAfterAll
+
+import org.jboss.netty.logging.InternalLoggerFactory
+import org.jboss.netty.logging.Slf4JLoggerFactory
/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
-trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
+trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
+ override def beforeAll() {
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ super.beforeAll()
+ }
+
override def afterEach() {
resetSparkContext()
super.afterEach()
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index 1c9ca50811..d263bb00e9 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -67,7 +67,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("pipe with non-zero exit status") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe("cat nonexistent_file")
+ val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
intercept[SparkException] {
piped.collect()
}
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 66e67cbfa1..450e48d66e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
-
+import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
- executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ try {
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ } catch {
+ case rej: RejectedExecutionException =>
+ logError("Could not submit checkpoint task to the thread pool executor", rej)
+ }
}
def stop() {