1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
import org.apache.spark.storage.TaskResultBlockId
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
*
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
override def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId) =>
sparkEnv.blockManager.master.removeBlock(blockId)
case directResult: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
serializedData.rewind()
removedResult = true
}
super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
}
}
/**
* Tests related to handling task results (both direct and indirect).
*/
class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
with LocalSparkContext {
override def beforeAll {
// Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
System.setProperty("spark.akka.frameSize", "1")
}
override def afterAll {
System.clearProperty("spark.akka.frameSize")
}
test("handling results smaller than Akka frame size") {
sc = new SparkContext("local", "test")
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
test("handling results larger than Akka frame size") {
sc = new SparkContext("local", "test")
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
val RESULT_BLOCK_ID = TaskResultBlockId(0)
assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
"Expect result to be removed from the block manager.")
}
test("task retried if result missing from block manager") {
// Set the maximum number of task failures to > 0, so that the task set isn't aborted
// after the result is missing.
sc = new SparkContext("local[1,2]", "test")
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
case clusterScheduler: TaskSchedulerImpl =>
clusterScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)
}
}
|