aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/HashShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/SortShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala25
8 files changed, 110 insertions, 50 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 81b64c36dd..429199f207 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -202,7 +202,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = SparkEnv.get.blockManager
val blockTransfer = SparkEnv.get.blockTransferService
blockManager.master.getLocations(blockId).foreach { cmId =>
- val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, blockId.toString)
+ val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
+ blockId.toString)
val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer())
.asInstanceOf[Iterator[Int]].toList
assert(deserialized === (1 to 100).toList)
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
new file mode 100644
index 0000000000..792b9cd8b6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient}
+
+/**
+ * This suite creates an external shuffle server and routes all shuffle fetches through it.
+ * Note that failures in this suite may arise due to changes in Spark that invalidate expectations
+ * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
+ * we hash files into folders.
+ */
+class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
+ var server: TransportServer = _
+ var rpcHandler: ExternalShuffleBlockHandler = _
+
+ override def beforeAll() {
+ val transportConf = SparkTransportConf.fromSparkConf(conf)
+ rpcHandler = new ExternalShuffleBlockHandler()
+ val transportContext = new TransportContext(transportConf, rpcHandler)
+ server = transportContext.createServer()
+
+ conf.set("spark.shuffle.manager", "sort")
+ conf.set("spark.shuffle.service.enabled", "true")
+ conf.set("spark.shuffle.service.port", server.getPort.toString)
+ }
+
+ override def afterAll() {
+ server.close()
+ }
+
+ // This test ensures that the external shuffle service is actually in use for the other tests.
+ test("using external shuffle service") {
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+ sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
+
+ val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
+
+ rdd.count()
+ rdd.count()
+
+ // Invalidate the registered executors, disallowing access to their shuffle blocks.
+ rpcHandler.clearRegisteredExecutors()
+
+ // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
+ // being set.
+ val e = intercept[SparkException] {
+ rdd.count()
+ }
+ e.getMessage should include ("Fetch failure will not retry stage due to testing config")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
index 2acc02a54f..19180e88eb 100644
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -24,10 +24,6 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
override def beforeAll() {
- System.setProperty("spark.shuffle.manager", "hash")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.manager")
+ conf.set("spark.shuffle.manager", "hash")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 840d8273cb..d78c99c2e1 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,10 +24,6 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
override def beforeAll() {
- System.setProperty("spark.shuffle.blockTransferService", "netty")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.blockTransferService")
+ conf.set("spark.shuffle.blockTransferService", "netty")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 2bdd84ce69..cda942e15a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -30,10 +30,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
val conf = new SparkConf(loadDefaults = false)
+ // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
+ // test that the shuffle works (rather than retrying until all blocks are local to one Executor).
+ conf.set("spark.test.noStageRetry", "true")
+
test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
@@ -47,7 +51,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
}
test("shuffle non-zero block size") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val NUM_BLOCKS = 3
val a = sc.parallelize(1 to 10, 2)
@@ -73,7 +77,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("shuffle serializer") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(x, new NonJavaSerializableClass(x * 2))
@@ -89,7 +93,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("zero sized blocks") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
// 10 partitions from 4 keys
val NUM_BLOCKS = 10
@@ -116,7 +120,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("zero sized blocks without kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
// 10 partitions from 4 keys
val NUM_BLOCKS = 10
@@ -141,7 +145,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -154,7 +158,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sorting on mutable pairs") {
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -168,7 +172,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("cogroup using mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
@@ -195,7 +199,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("subtract mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
@@ -209,11 +213,8 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .setAppName("test")
- .setMaster("local-cluster[2,1,512]")
- sc = new SparkContext(conf)
+ val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
@@ -226,10 +227,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("sort with Java non serializable class - Java") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- val conf = new SparkConf()
- .setAppName("test")
- .setMaster("local-cluster[2,1,512]")
- sc = new SparkContext(conf)
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index 639e56c488..63358172ea 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -24,10 +24,6 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.
override def beforeAll() {
- System.setProperty("spark.shuffle.manager", "sort")
- }
-
- override def afterAll() {
- System.clearProperty("spark.shuffle.manager")
+ conf.set("spark.shuffle.manager", "sort")
}
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 3925f0ccbd..bbdc9568a6 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -121,7 +121,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
}
val appId = "testId"
- val executorId = "executor.1"
+ val executorId = "1"
conf.set("spark.app.id", appId)
conf.set("spark.executor.id", executorId)
@@ -138,7 +138,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
override val metricRegistry = new MetricRegistry()
}
- val executorId = "executor.1"
+ val executorId = "1"
conf.set("spark.executor.id", executorId)
val instanceName = "executor"
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 4e502cf65e..28f766570e 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -21,22 +21,19 @@ import java.util.concurrent.Semaphore
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
-import org.apache.spark.{TaskContextImpl, TaskContext}
-import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
-import org.mockito.Mockito._
import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-
import org.scalatest.FunSuite
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{SparkConf, TaskContextImpl}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.serializer.TestSerializer
-
class ShuffleBlockFetcherIteratorSuite extends FunSuite {
// Some of the tests are quite tricky because we are testing the cleanup behavior
// in the presence of faults.
@@ -44,10 +41,10 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
/** Creates a mock [[BlockTransferService]] that returns data from the given map. */
private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): BlockTransferService = {
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val blocks = invocation.getArguments()(2).asInstanceOf[Seq[String]]
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
for (blockId <- blocks) {
if (data.contains(BlockId(blockId))) {
@@ -118,7 +115,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
// 3 local blocks, and 2 remote blocks
// (but from the same block manager so one call to fetchBlocks)
verify(blockManager, times(3)).getBlockData(any())
- verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any())
+ verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any())
}
test("release current unexhausted buffer in case the task completes early") {
@@ -138,9 +135,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val sem = new Semaphore(0)
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
listener.onBlockFetchSuccess(
@@ -201,9 +198,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val sem = new Semaphore(0)
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
// Return the first block, and then fail.
listener.onBlockFetchSuccess(