aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/resources/fairscheduler.xml1
-rw-r--r--core/src/test/resources/log4j.properties1
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala4
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala1
-rw-r--r--core/src/test/scala/spark/LocalSparkContext.scala11
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala33
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala75
9 files changed, 127 insertions, 3 deletions
diff --git a/core/src/test/resources/fairscheduler.xml b/core/src/test/resources/fairscheduler.xml
index 5a688b0ebb..6e573b1883 100644
--- a/core/src/test/resources/fairscheduler.xml
+++ b/core/src/test/resources/fairscheduler.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0"?>
<allocations>
<pool name="1">
<minShare>2</minShare>
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 6ec89c0184..d05cf3dec1 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -8,3 +8,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0866fb47b3..0024ede828 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -10,6 +10,7 @@ import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
+import org.eclipse.jetty.server.{Server, Request, Handler}
import com.google.common.io.Files
@@ -17,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+import ui.JettyUtils
class NotSerializableClass
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 5e84b3a66a..31c3dd75fb 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -2,6 +2,9 @@ package spark
import java.io.File
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts
import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -27,6 +30,7 @@ class DriverSuite extends FunSuite with Timeouts {
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
+ Logger.getRootLogger().setLevel(Level.WARN)
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
sc.parallelize(1 to 100, 4).count()
}
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index f1a35bced3..9c24ca430d 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -85,7 +85,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
in.close()
_ * fileVal + _ * fileVal
}.collect
- println(result)
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index 76d5258b02..bd184222ed 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -2,12 +2,21 @@ package spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.BeforeAndAfterAll
+
+import org.jboss.netty.logging.InternalLoggerFactory
+import org.jboss.netty.logging.Slf4JLoggerFactory
/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
-trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
+trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
+ override def beforeAll() {
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ super.beforeAll()
+ }
+
override def afterEach() {
resetSparkContext()
super.afterEach()
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index 1c9ca50811..d263bb00e9 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -67,7 +67,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("pipe with non-zero exit status") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe("cat nonexistent_file")
+ val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
intercept[SparkException] {
piped.collect()
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index d8db69b1c9..e41ae385c0 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -251,4 +251,37 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(topK.size === 2)
assert(topK.sorted === Array("b", "a"))
}
+
+ test("takeSample") {
+ val data = sc.parallelize(1 to 100, 2)
+ for (seed <- 1 to 5) {
+ val sample = data.takeSample(withReplacement=false, 20, seed)
+ assert(sample.size === 20) // Got exactly 20 elements
+ assert(sample.toSet.size === 20) // Elements are distinct
+ assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
+ }
+ for (seed <- 1 to 5) {
+ val sample = data.takeSample(withReplacement=false, 200, seed)
+ assert(sample.size === 100) // Got only 100 elements
+ assert(sample.toSet.size === 100) // Elements are distinct
+ assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
+ }
+ for (seed <- 1 to 5) {
+ val sample = data.takeSample(withReplacement=true, 20, seed)
+ assert(sample.size === 20) // Got exactly 20 elements
+ assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
+ }
+ for (seed <- 1 to 5) {
+ val sample = data.takeSample(withReplacement=true, 100, seed)
+ assert(sample.size === 100) // Got exactly 100 elements
+ // Chance of getting all distinct elements is astronomically low, so test we got < 100
+ assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
+ }
+ for (seed <- 1 to 5) {
+ val sample = data.takeSample(withReplacement=true, 200, seed)
+ assert(sample.size === 200) // Got exactly 200 elements
+ // Chance of getting all distinct elements is still quite low, so test we got < 100
+ assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
+ }
+ }
}
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
new file mode 100644
index 0000000000..b40c29bede
--- /dev/null
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -0,0 +1,75 @@
+package spark.ui
+
+import org.scalatest.FunSuite
+import org.eclipse.jetty.server.Server
+import java.net.ServerSocket
+import scala.util.{Failure, Success, Try}
+import spark.Utils
+import com.google.common.io.Files
+import java.io.{FileOutputStream, File}
+import com.google.common.base.Charsets
+
+class UISuite extends FunSuite {
+ test("jetty port increases under contention") {
+ val startPort = 33333
+ val server = new Server(startPort)
+ server.start()
+ val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+ val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+
+ assert(boundPort1 === startPort + 1)
+ assert(boundPort2 === startPort + 2)
+ }
+
+ test("jetty binds to port 0 correctly") {
+ val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", 0, Seq())
+ assert(jettyServer.getState === "STARTED")
+ assert(boundPort != 0)
+ Try {new ServerSocket(boundPort)} match {
+ case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
+ case Failure (e) =>
+ }
+ }
+
+ test("string formatting of time durations") {
+ val second = 1000
+ val minute = second * 60
+ val hour = minute * 60
+ def str = Utils.msDurationToString(_)
+
+ assert(str(123) === "123 ms")
+ assert(str(second) === "1.0 s")
+ assert(str(second + 462) === "1.5 s")
+ assert(str(hour) === "1.00 h")
+ assert(str(minute) === "1.0 m")
+ assert(str(minute + 4 * second + 34) === "1.1 m")
+ assert(str(10 * hour + minute + 4 * second) === "10.02 h")
+ assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
+ }
+
+ test("reading last n bytes of a file") {
+ val tmpDir = Files.createTempDir()
+
+ // File smaller than limit
+ val f1Path = tmpDir + "/f1"
+ val f1 = new FileOutputStream(f1Path)
+ f1.write("a\nb\nc\nd".getBytes(Charsets.UTF_8))
+ f1.close()
+ assert(Utils.lastNBytes(f1Path, 1024) === "a\nb\nc\nd")
+
+ // File larger than limit
+ val f2Path = tmpDir + "/f2"
+ val f2 = new FileOutputStream(f2Path)
+ f2.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
+ f2.close()
+ assert(Utils.lastNBytes(f2Path, 8) === "5\n6\n7\n8\n")
+
+ // Request limit too
+ val f3Path = tmpDir + "/f2"
+ val f3 = new FileOutputStream(f3Path)
+ f3.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
+ f3.close()
+ assert(Utils.lastNBytes(f3Path, 8) === "5\n6\n7\n8\n")
+
+ }
+}