diff options
Diffstat (limited to 'core/src/test')
8 files changed, 110 insertions, 50 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 81b64c36dd..429199f207 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -202,7 +202,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter val blockManager = SparkEnv.get.blockManager val blockTransfer = SparkEnv.get.blockTransferService blockManager.master.getLocations(blockId).foreach { cmId => - val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, blockId.toString) + val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, + blockId.toString) val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer()) .asInstanceOf[Iterator[Int]].toList assert(deserialized === (1 to 100).toList) diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala new file mode 100644 index 0000000000..792b9cd8b6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkContext._ +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.server.TransportServer +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient} + +/** + * This suite creates an external shuffle server and routes all shuffle fetches through it. + * Note that failures in this suite may arise due to changes in Spark that invalidate expectations + * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how + * we hash files into folders. + */ +class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { + var server: TransportServer = _ + var rpcHandler: ExternalShuffleBlockHandler = _ + + override def beforeAll() { + val transportConf = SparkTransportConf.fromSparkConf(conf) + rpcHandler = new ExternalShuffleBlockHandler() + val transportContext = new TransportContext(transportConf, rpcHandler) + server = transportContext.createServer() + + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.shuffle.service.port", server.getPort.toString) + } + + override def afterAll() { + server.close() + } + + // This test ensures that the external shuffle service is actually in use for the other tests. + test("using external shuffle service") { + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient]) + + val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) + + rdd.count() + rdd.count() + + // Invalidate the registered executors, disallowing access to their shuffle blocks. + rpcHandler.clearRegisteredExecutors() + + // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry" + // being set. + val e = intercept[SparkException] { + rdd.count() + } + e.getMessage should include ("Fetch failure will not retry stage due to testing config") + } +} diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala index 2acc02a54f..19180e88eb 100644 --- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala @@ -24,10 +24,6 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with hash-based shuffle. override def beforeAll() { - System.setProperty("spark.shuffle.manager", "hash") - } - - override def afterAll() { - System.clearProperty("spark.shuffle.manager") + conf.set("spark.shuffle.manager", "hash") } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index 840d8273cb..d78c99c2e1 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -24,10 +24,6 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with Netty shuffle mode. override def beforeAll() { - System.setProperty("spark.shuffle.blockTransferService", "netty") - } - - override def afterAll() { - System.clearProperty("spark.shuffle.blockTransferService") + conf.set("spark.shuffle.blockTransferService", "netty") } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 2bdd84ce69..cda942e15a 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -30,10 +30,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex val conf = new SparkConf(loadDefaults = false) + // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately + // test that the shuffle works (rather than retrying until all blocks are local to one Executor). + conf.set("spark.test.noStageRetry", "true") + test("groupByKey without compression") { try { System.setProperty("spark.shuffle.compress", "false") - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() assert(groups.size === 2) @@ -47,7 +51,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex } test("shuffle non-zero block size") { - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) val NUM_BLOCKS = 3 val a = sc.parallelize(1 to 10, 2) @@ -73,7 +77,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("shuffle serializer") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) val a = sc.parallelize(1 to 10, 2) val b = a.map { x => (x, new NonJavaSerializableClass(x * 2)) @@ -89,7 +93,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("zero sized blocks") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) // 10 partitions from 4 keys val NUM_BLOCKS = 10 @@ -116,7 +120,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("zero sized blocks without kryo") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) // 10 partitions from 4 keys val NUM_BLOCKS = 10 @@ -141,7 +145,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("shuffle on mutable pairs") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1)) val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2) @@ -154,7 +158,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("sorting on mutable pairs") { // This is not in SortingSuite because of the local cluster setup. // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22)) val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2) @@ -168,7 +172,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("cogroup using mutable pairs") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1)) val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3")) @@ -195,7 +199,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("subtract mutable pairs") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,512]", "test") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33)) val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22")) @@ -209,11 +213,8 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("sort with Java non serializable class - Kryo") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .setAppName("test") - .setMaster("local-cluster[2,1,512]") - sc = new SparkContext(conf) + val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + sc = new SparkContext("local-cluster[2,1,512]", "test", myConf) val a = sc.parallelize(1 to 10, 2) val b = a.map { x => (new NonJavaSerializableClass(x), x) @@ -226,10 +227,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex test("sort with Java non serializable class - Java") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - val conf = new SparkConf() - .setAppName("test") - .setMaster("local-cluster[2,1,512]") - sc = new SparkContext(conf) + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) val a = sc.parallelize(1 to 10, 2) val b = a.map { x => (new NonJavaSerializableClass(x), x) diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index 639e56c488..63358172ea 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -24,10 +24,6 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with sort-based shuffle. override def beforeAll() { - System.setProperty("spark.shuffle.manager", "sort") - } - - override def afterAll() { - System.clearProperty("spark.shuffle.manager") + conf.set("spark.shuffle.manager", "sort") } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 3925f0ccbd..bbdc9568a6 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -121,7 +121,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod } val appId = "testId" - val executorId = "executor.1" + val executorId = "1" conf.set("spark.app.id", appId) conf.set("spark.executor.id", executorId) @@ -138,7 +138,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod override val metricRegistry = new MetricRegistry() } - val executorId = "executor.1" + val executorId = "1" conf.set("spark.executor.id", executorId) val instanceName = "executor" diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 4e502cf65e..28f766570e 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -21,22 +21,19 @@ import java.util.concurrent.Semaphore import scala.concurrent.future import scala.concurrent.ExecutionContext.Implicits.global -import org.apache.spark.{TaskContextImpl, TaskContext} -import org.apache.spark.network.{BlockFetchingListener, BlockTransferService} -import org.mockito.Mockito._ import org.mockito.Matchers.{any, eq => meq} +import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer - import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.{SparkConf, TaskContextImpl} import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.serializer.TestSerializer - class ShuffleBlockFetcherIteratorSuite extends FunSuite { // Some of the tests are quite tricky because we are testing the cleanup behavior // in the presence of faults. @@ -44,10 +41,10 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { /** Creates a mock [[BlockTransferService]] that returns data from the given map. */ private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): BlockTransferService = { val transfer = mock(classOf[BlockTransferService]) - when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] { + when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { - val blocks = invocation.getArguments()(2).asInstanceOf[Seq[String]] - val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener] + val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]] + val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] for (blockId <- blocks) { if (data.contains(BlockId(blockId))) { @@ -118,7 +115,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { // 3 local blocks, and 2 remote blocks // (but from the same block manager so one call to fetchBlocks) verify(blockManager, times(3)).getBlockData(any()) - verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any()) + verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any()) } test("release current unexhausted buffer in case the task completes early") { @@ -138,9 +135,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { val sem = new Semaphore(0) val transfer = mock(classOf[BlockTransferService]) - when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] { + when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { - val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener] + val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] future { // Return the first two blocks, and wait till task completion before returning the 3rd one listener.onBlockFetchSuccess( @@ -201,9 +198,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { val sem = new Semaphore(0) val transfer = mock(classOf[BlockTransferService]) - when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] { + when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { - val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener] + val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] future { // Return the first block, and then fail. listener.onBlockFetchSuccess( |