aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/pom.xml6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala105
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala1
7 files changed, 32 insertions, 136 deletions
diff --git a/core/pom.xml b/core/pom.xml
index f5fdb40696..90c8f97f2b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -356,12 +356,12 @@
<phase>generate-resources</phase>
<configuration>
<!-- Execute the shell script to generate the spark build information. -->
- <tasks>
+ <target>
<exec executable="${project.basedir}/../build/spark-build-info">
<arg value="${project.build.directory}/extra-resources"/>
- <arg value="${pom.version}"/>
+ <arg value="${project.version}"/>
</exec>
- </tasks>
+ </target>
</configuration>
<goals>
<goal>run</goal>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f28f429e0c..3c30ec8ee8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1602,13 +1602,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
- val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
- override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
- override def zero(initialValue: Int): Int = 0
- override def addInPlace(r1: Int, r2: Int): Int = {
- throw new DAGSchedulerSuiteDummyException
- }
- })
+ val acc = new LongAccumulator {
+ override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
+ override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException
+ }
+ sc.register(acc)
// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 5271a5671a..54b7312991 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{Duration, SECONDS}
+import scala.language.existentials
import scala.reflect.ClassTag
import org.scalactic.TripleEquals
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 368668bc7e..9eda79ace1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
- val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
- val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", true)
+ val acc2 = AccumulatorSuite.createLongAccum("y", false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
- acc1 += 1
- acc2 += 1
+ acc1.add(1)
+ acc2.add(1)
if (TaskContext.get.attemptNumber() <= 2) {
throw new Exception("you did something wrong")
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index f2c558ac2d..e89f792496 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.HashSet
-import org.apache.spark.{Accumulator, AccumulatorParam}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
/**
* Contains methods for debugging query execution.
@@ -108,26 +107,27 @@ package object debug {
private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
def output: Seq[Attribute] = child.output
- implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
- def zero(initialValue: HashSet[String]): HashSet[String] = {
- initialValue.clear()
- initialValue
- }
-
- def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
- v1 ++= v2
- v1
+ class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
+ private val _set = new HashSet[T]()
+ override def isZero: Boolean = _set.isEmpty
+ override def copy(): AccumulatorV2[T, HashSet[T]] = {
+ val newAcc = new SetAccumulator[T]()
+ newAcc._set ++= _set
+ newAcc
}
+ override def reset(): Unit = _set.clear()
+ override def add(v: T): Unit = _set += v
+ override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value
+ override def value: HashSet[T] = _set
}
/**
* A collection of metrics for each column of output.
- *
- * @param elementTypes the actual runtime types for the output. Useful when there are bugs
- * causing the wrong data to be projected.
*/
- case class ColumnMetrics(
- elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
+ case class ColumnMetrics() {
+ val elementTypes = new SetAccumulator[String]
+ sparkContext.register(elementTypes)
+ }
val tupleCount: LongAccumulator = sparkContext.longAccumulator
@@ -155,7 +155,7 @@ package object debug {
while (i < numColumns) {
val value = currentRow.get(i, output(i).dataType)
if (value != null) {
- columnStats(i).elementTypes += HashSet(value.getClass.getName)
+ columnStats(i).elementTypes.add(value.getClass.getName)
}
i += 1
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index fd956bc4ef..579a095ff0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -17,13 +17,6 @@
package org.apache.spark.sql.execution.metric
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import scala.collection.mutable
-
-import org.apache.xbean.asm5._
-import org.apache.xbean.asm5.Opcodes._
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.execution.SparkPlanInfo
@@ -31,34 +24,11 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
-
+import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
- test("SQLMetric should not box Long") {
- val l = SQLMetrics.createMetric(sparkContext, "long")
- val f = () => {
- l += 1L
- l.add(1L)
- }
- val cl = BoxingFinder.getClassReader(f.getClass)
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
- }
-
- test("Normal accumulator should do boxing") {
- // We need this test to make sure BoxingFinder works.
- val l = sparkContext.accumulator(0L)
- val f = () => { l += 1L }
- val cl = BoxingFinder.getClassReader(f.getClass)
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
- }
-
/**
* Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
*
@@ -323,76 +293,3 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
-
-private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
-
-/**
- * If `method` is null, search all methods of this class recursively to find if they do some boxing.
- * If `method` is specified, only search this method of the class to speed up the searching.
- *
- * This method will skip the methods in `visitedMethods` to avoid potential infinite cycles.
- */
-private class BoxingFinder(
- method: MethodIdentifier[_] = null,
- val boxingInvokes: mutable.Set[String] = mutable.Set.empty,
- visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty)
- extends ClassVisitor(ASM5) {
-
- private val primitiveBoxingClassName =
- Set("java/lang/Long",
- "java/lang/Double",
- "java/lang/Integer",
- "java/lang/Float",
- "java/lang/Short",
- "java/lang/Character",
- "java/lang/Byte",
- "java/lang/Boolean")
-
- override def visitMethod(
- access: Int, name: String, desc: String, sig: String, exceptions: Array[String]):
- MethodVisitor = {
- if (method != null && (method.name != name || method.desc != desc)) {
- // If method is specified, skip other methods.
- return new MethodVisitor(ASM5) {}
- }
-
- new MethodVisitor(ASM5) {
- override def visitMethodInsn(
- op: Int, owner: String, name: String, desc: String, itf: Boolean) {
- if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") {
- if (primitiveBoxingClassName.contains(owner)) {
- // Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l)
- boxingInvokes.add(s"$owner.$name")
- }
- } else {
- // scalastyle:off classforname
- val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false,
- Thread.currentThread.getContextClassLoader)
- // scalastyle:on classforname
- val m = MethodIdentifier(classOfMethodOwner, name, desc)
- if (!visitedMethods.contains(m)) {
- // Keep track of visited methods to avoid potential infinite cycles
- visitedMethods += m
- val cl = BoxingFinder.getClassReader(classOfMethodOwner)
- visitedMethods += m
- cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0)
- }
- }
- }
- }
- }
-}
-
-private object BoxingFinder {
-
- def getClassReader(cls: Class[_]): ClassReader = {
- val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
- val resourceStream = cls.getResourceAsStream(className)
- val baos = new ByteArrayOutputStream(128)
- // Copy data over, before delegating to ClassReader -
- // else we can run out of open file handles.
- Utils.copyStream(resourceStream, baos, true)
- new ClassReader(new ByteArrayInputStream(baos.toByteArray))
- }
-
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index f4f8bd435d..207dbf56d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
def createContainer(host: String): Container = {
+ // When YARN 2.6+ is required, avoid deprecation by using version with long second arg
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)