aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-06-14 09:40:07 -0700
committerWenchen Fan <wenchen@databricks.com>2016-06-14 09:40:07 -0700
commit6151d2641f91c8e3ec0c324e78afb46cdb2ef111 (patch)
treee183d9aa95c468b340d8e0b1b4a19ca0982a4148
parent6e8cdef0cf36f6e921d9e1a65c61b66196935820 (diff)
downloadspark-6151d2641f91c8e3ec0c324e78afb46cdb2ef111.tar.gz
spark-6151d2641f91c8e3ec0c324e78afb46cdb2ef111.tar.bz2
spark-6151d2641f91c8e3ec0c324e78afb46cdb2ef111.zip
[MINOR] Clean up several build warnings, mostly due to internal use of old accumulators
## What changes were proposed in this pull request? Another PR to clean up recent build warnings. This particularly cleans up several instances of the old accumulator API usage in tests that are straightforward to update. I think this qualifies as "minor". ## How was this patch tested? Jenkins Author: Sean Owen <sowen@cloudera.com> Closes #13642 from srowen/BuildWarnings.
-rw-r--r--core/pom.xml6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala105
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala1
7 files changed, 32 insertions, 136 deletions
diff --git a/core/pom.xml b/core/pom.xml
index f5fdb40696..90c8f97f2b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -356,12 +356,12 @@
<phase>generate-resources</phase>
<configuration>
<!-- Execute the shell script to generate the spark build information. -->
- <tasks>
+ <target>
<exec executable="${project.basedir}/../build/spark-build-info">
<arg value="${project.build.directory}/extra-resources"/>
- <arg value="${pom.version}"/>
+ <arg value="${project.version}"/>
</exec>
- </tasks>
+ </target>
</configuration>
<goals>
<goal>run</goal>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f28f429e0c..3c30ec8ee8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1602,13 +1602,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
- val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
- override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
- override def zero(initialValue: Int): Int = 0
- override def addInPlace(r1: Int, r2: Int): Int = {
- throw new DAGSchedulerSuiteDummyException
- }
- })
+ val acc = new LongAccumulator {
+ override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
+ override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException
+ }
+ sc.register(acc)
// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 5271a5671a..54b7312991 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{Duration, SECONDS}
+import scala.language.existentials
import scala.reflect.ClassTag
import org.scalactic.TripleEquals
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 368668bc7e..9eda79ace1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
- val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
- val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", true)
+ val acc2 = AccumulatorSuite.createLongAccum("y", false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
- acc1 += 1
- acc2 += 1
+ acc1.add(1)
+ acc2.add(1)
if (TaskContext.get.attemptNumber() <= 2) {
throw new Exception("you did something wrong")
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index f2c558ac2d..e89f792496 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.HashSet
-import org.apache.spark.{Accumulator, AccumulatorParam}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
/**
* Contains methods for debugging query execution.
@@ -108,26 +107,27 @@ package object debug {
private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
def output: Seq[Attribute] = child.output
- implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
- def zero(initialValue: HashSet[String]): HashSet[String] = {
- initialValue.clear()
- initialValue
- }
-
- def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
- v1 ++= v2
- v1
+ class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
+ private val _set = new HashSet[T]()
+ override def isZero: Boolean = _set.isEmpty
+ override def copy(): AccumulatorV2[T, HashSet[T]] = {
+ val newAcc = new SetAccumulator[T]()
+ newAcc._set ++= _set
+ newAcc
}
+ override def reset(): Unit = _set.clear()
+ override def add(v: T): Unit = _set += v
+ override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value
+ override def value: HashSet[T] = _set
}
/**
* A collection of metrics for each column of output.
- *
- * @param elementTypes the actual runtime types for the output. Useful when there are bugs
- * causing the wrong data to be projected.
*/
- case class ColumnMetrics(
- elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
+ case class ColumnMetrics() {
+ val elementTypes = new SetAccumulator[String]
+ sparkContext.register(elementTypes)
+ }
val tupleCount: LongAccumulator = sparkContext.longAccumulator
@@ -155,7 +155,7 @@ package object debug {
while (i < numColumns) {
val value = currentRow.get(i, output(i).dataType)
if (value != null) {
- columnStats(i).elementTypes += HashSet(value.getClass.getName)
+ columnStats(i).elementTypes.add(value.getClass.getName)
}
i += 1
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index fd956bc4ef..579a095ff0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -17,13 +17,6 @@
package org.apache.spark.sql.execution.metric
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import scala.collection.mutable
-
-import org.apache.xbean.asm5._
-import org.apache.xbean.asm5.Opcodes._
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.execution.SparkPlanInfo
@@ -31,34 +24,11 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
-
+import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
- test("SQLMetric should not box Long") {
- val l = SQLMetrics.createMetric(sparkContext, "long")
- val f = () => {
- l += 1L
- l.add(1L)
- }
- val cl = BoxingFinder.getClassReader(f.getClass)
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
- }
-
- test("Normal accumulator should do boxing") {
- // We need this test to make sure BoxingFinder works.
- val l = sparkContext.accumulator(0L)
- val f = () => { l += 1L }
- val cl = BoxingFinder.getClassReader(f.getClass)
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
- }
-
/**
* Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
*
@@ -323,76 +293,3 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
-
-private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
-
-/**
- * If `method` is null, search all methods of this class recursively to find if they do some boxing.
- * If `method` is specified, only search this method of the class to speed up the searching.
- *
- * This method will skip the methods in `visitedMethods` to avoid potential infinite cycles.
- */
-private class BoxingFinder(
- method: MethodIdentifier[_] = null,
- val boxingInvokes: mutable.Set[String] = mutable.Set.empty,
- visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty)
- extends ClassVisitor(ASM5) {
-
- private val primitiveBoxingClassName =
- Set("java/lang/Long",
- "java/lang/Double",
- "java/lang/Integer",
- "java/lang/Float",
- "java/lang/Short",
- "java/lang/Character",
- "java/lang/Byte",
- "java/lang/Boolean")
-
- override def visitMethod(
- access: Int, name: String, desc: String, sig: String, exceptions: Array[String]):
- MethodVisitor = {
- if (method != null && (method.name != name || method.desc != desc)) {
- // If method is specified, skip other methods.
- return new MethodVisitor(ASM5) {}
- }
-
- new MethodVisitor(ASM5) {
- override def visitMethodInsn(
- op: Int, owner: String, name: String, desc: String, itf: Boolean) {
- if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") {
- if (primitiveBoxingClassName.contains(owner)) {
- // Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l)
- boxingInvokes.add(s"$owner.$name")
- }
- } else {
- // scalastyle:off classforname
- val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false,
- Thread.currentThread.getContextClassLoader)
- // scalastyle:on classforname
- val m = MethodIdentifier(classOfMethodOwner, name, desc)
- if (!visitedMethods.contains(m)) {
- // Keep track of visited methods to avoid potential infinite cycles
- visitedMethods += m
- val cl = BoxingFinder.getClassReader(classOfMethodOwner)
- visitedMethods += m
- cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0)
- }
- }
- }
- }
- }
-}
-
-private object BoxingFinder {
-
- def getClassReader(cls: Class[_]): ClassReader = {
- val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
- val resourceStream = cls.getResourceAsStream(className)
- val baos = new ByteArrayOutputStream(128)
- // Copy data over, before delegating to ClassReader -
- // else we can run out of open file handles.
- Utils.copyStream(resourceStream, baos, true)
- new ClassReader(new ByteArrayInputStream(baos.toByteArray))
- }
-
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index f4f8bd435d..207dbf56d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
def createContainer(host: String): Container = {
+ // When YARN 2.6+ is required, avoid deprecation by using version with long second arg
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)