aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-08 12:20:07 +0000
committerSean Owen <sowen@cloudera.com>2017-02-08 12:20:07 +0000
commite8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec (patch)
treebb3851bea6be9e71f2533e27ee4ca427e36ff3fd /core/src
parentd60dde26f98164ae146da1b5f409f4eb7c3621aa (diff)
downloadspark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.gz
spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.tar.bz2
spark-e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec.zip
[SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier
## What changes were proposed in this pull request? - Remove support for Hadoop 2.5 and earlier - Remove reflection and code constructs only needed to support multiple versions at once - Update docs to reflect newer versions - Remove older versions' builds and profiles. ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16810 from srowen/SPARK-19464.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala74
6 files changed, 79 insertions, 189 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 23156072c3..941e2d13fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy
import java.io.IOException
-import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
@@ -29,7 +28,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
-import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging {
/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
- * return the bytes read on r since t. Reflection is required because thread-level FileSystem
- * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
- * Returns None if the required method can't be found.
+ * return the bytes read on r since t.
+ *
+ * @return None if the required method can't be found.
*/
- private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
- try {
- val threadStats = getFileSystemThreadStatistics()
- val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
- val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
- val baselineBytesRead = f()
- Some(() => f() - baselineBytesRead)
- } catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
- logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
- None
- }
+ private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
+ val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+ val f = () => threadStats.map(_.getBytesRead).sum
+ val baselineBytesRead = f()
+ () => f() - baselineBytesRead
}
/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
- * return the bytes written on r since t. Reflection is required because thread-level FileSystem
- * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
- * Returns None if the required method can't be found.
+ * return the bytes written on r since t.
+ *
+ * @return None if the required method can't be found.
*/
- private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
- try {
- val threadStats = getFileSystemThreadStatistics()
- val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
- val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
- val baselineBytesWritten = f()
- Some(() => f() - baselineBytesWritten)
- } catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
- logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
- None
- }
- }
-
- private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
- FileSystem.getAllStatistics.asScala.map(
- Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
- }
-
- private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
- val statisticsDataClass =
- Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
- statisticsDataClass.getDeclaredMethod(methodName)
+ private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = {
+ val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+ val f = () => threadStats.map(_.getBytesWritten).sum
+ val baselineBytesWritten = f()
+ () => f() - baselineBytesWritten
}
/**
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 63918ef12a..1e0a1e605c 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
committer.setupTask(taskContext)
- val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+ val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
// Initiate the writer.
val taskFormat = outputFormat.newInstance()
@@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging {
writer.write(pair._1, pair._2)
// Update bytes written metric every few records
- SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback, recordsWritten)
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
if (writer != null) {
@@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging {
}
})
- outputMetricsAndBytesWrittenCallback.foreach {
- case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
ret
} catch {
@@ -222,24 +217,18 @@ object SparkHadoopWriterUtils {
// TODO: these don't seem like the right abstractions.
// We should abstract the duplicate code in a less awkward way.
- // return type: (output metrics, bytes written callback), defined only if the latter is defined
- def initHadoopOutputMetrics(
- context: TaskContext): Option[(OutputMetrics, () => Long)] = {
+ def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
- bytesWrittenCallback.map { b =>
- (context.taskMetrics().outputMetrics, b)
- }
+ (context.taskMetrics().outputMetrics, bytesWrittenCallback)
}
def maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+ outputMetrics: OutputMetrics,
+ callback: () => Long,
recordsWritten: Long): Unit = {
if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
- outputMetricsAndBytesWrittenCallback.foreach {
- case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a83e139c13..5fa6a7ed31 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -25,15 +25,7 @@ import scala.collection.immutable.Map
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapred.FileSplit
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.InputSplit
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.JobID
-import org.apache.hadoop.mapred.RecordReader
-import org.apache.hadoop.mapred.Reporter
-import org.apache.hadoop.mapred.TaskAttemptID
-import org.apache.hadoop.mapred.TaskID
+import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.util.ReflectionUtils
@@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
+import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -229,11 +221,11 @@ class HadoopRDD[K, V](
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
@@ -280,13 +272,9 @@ class HadoopRDD[K, V](
(key, value)
}
- override def close() {
+ override def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
- // corruption issues when reading compressed input.
try {
reader.close()
} catch {
@@ -326,18 +314,10 @@ class HadoopRDD[K, V](
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
- val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
- val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
- HadoopRDD.convertSplitLocationInfo(infos)
- } catch {
- case e: Exception =>
- logDebug("Failed to use InputSplitWithLocations.", e)
- None
- }
- case None => None
+ val locs = hsplit match {
+ case lsplit: InputSplitWithLocationInfo =>
+ HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
+ case _ => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
@@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging {
}
}
- private[spark] class SplitInfoReflections {
- val inputSplitWithLocationInfo =
- Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
- val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
- val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit")
- val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
- val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo")
- val isInMemory = splitLocationInfo.getMethod("isInMemory")
- val getLocation = splitLocationInfo.getMethod("getLocation")
- }
-
- private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
- Some(new SplitInfoReflections)
- } catch {
- case e: Exception =>
- logDebug("SplitLocationInfo and other new Hadoop classes are " +
- "unavailable. Using the older Hadoop location info code.", e)
- None
- }
-
- private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+ private[spark] def convertSplitLocationInfo(
+ infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
- val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
- val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
+ val locationStr = loc.getLocation
if (locationStr != "localhost") {
- if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+ if (loc.isInMemory) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 733e85f305..ce3a9a2a1e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -152,11 +152,11 @@ class NewHadoopRDD[K, V](
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
@@ -231,13 +231,9 @@ class NewHadoopRDD[K, V](
(reader.getCurrentKey, reader.getCurrentValue)
}
- private def close() {
+ private def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
- // corruption issues when reading compressed input.
try {
reader.close()
} catch {
@@ -277,18 +273,7 @@ class NewHadoopRDD[K, V](
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
- val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- HadoopRDD.convertSplitLocationInfo(infos)
- } catch {
- case e : Exception =>
- logDebug("Failed to use InputSplit#getLocationInfo.", e)
- None
- }
- case None => None
- }
+ val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 41093bdb85..567a3183e2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -37,8 +37,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.OutputMetrics
-import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
@@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
- val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+ val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
@@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
- SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback, recordsWritten)
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
}(finallyBlock = writer.close())
writer.commit()
- outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
}
self.context.runJob(self, writeToFile)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index a73b300ec2..becf3829e7 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
@@ -186,10 +185,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.listenerBus.waitUntilEmpty(500)
assert(inputRead == numRecords)
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- assert(outputWritten == numBuckets)
- }
+ assert(outputWritten == numBuckets)
assert(shuffleRead == shuffleWritten)
}
@@ -262,57 +258,49 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
test("output metrics on records written") {
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val file = new File(tmpDir, getClass.getSimpleName)
- val filePath = "file://" + file.getAbsolutePath
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
- val records = runAndReturnRecordsWritten {
- sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
- }
- assert(records == numRecords)
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
}
+ assert(records == numRecords)
}
test("output metrics on records written - new Hadoop API") {
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val file = new File(tmpDir, getClass.getSimpleName)
- val filePath = "file://" + file.getAbsolutePath
-
- val records = runAndReturnRecordsWritten {
- sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
- .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
- }
- assert(records == numRecords)
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
+ .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
}
+ assert(records == numRecords)
}
test("output metrics when writing text file") {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val taskBytesWritten = new ArrayBuffer[Long]()
- sc.addSparkListener(new SparkListener() {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
- }
- })
-
- val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
-
- try {
- rdd.saveAsTextFile(outPath.toString)
- sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesWritten.length == 2)
- val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
- taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
- assert(bytes >= fileStatus.getLen)
- }
- } finally {
- fs.delete(outPath, true)
+ val taskBytesWritten = new ArrayBuffer[Long]()
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
+ }
+ })
+
+ val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+
+ try {
+ rdd.saveAsTextFile(outPath.toString)
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(taskBytesWritten.length == 2)
+ val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
+ taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
+ assert(bytes >= fileStatus.getLen)
}
+ } finally {
+ fs.delete(outPath, true)
}
}