aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala (renamed from core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala)6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala14
11 files changed, 38 insertions, 51 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 8358244987..63d1d1767a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -35,9 +35,9 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
override def getPartitions: Array[Partition] = {
assertValid()
- (0 until blockIds.length).map(i => {
+ (0 until blockIds.length).map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
- }).toArray
+ }.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index e5ebc63082..7bc1eb0436 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -29,10 +29,12 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
import org.apache.spark.util.Utils
-/** The references to rdd and splitIndex are transient because redundant information is stored
- * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
- * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
- * task closure. */
+/**
+ * The references to rdd and splitIndex are transient because redundant information is stored
+ * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
+ * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
+ * task closure.
+ */
private[spark] case class NarrowCoGroupSplitDep(
@transient rdd: RDD[_],
@transient splitIndex: Int,
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 5e9230e733..368916a39e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -166,8 +166,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
val counters = new Array[Long](buckets.length - 1)
while (iter.hasNext) {
bucketFunction(iter.next()) match {
- case Some(x: Int) => {counters(x) += 1}
- case _ => {}
+ case Some(x: Int) => counters(x) += 1
+ case _ => // No-Op
}
}
Iterator(counters)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08db96edd6..35d190b464 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,15 +213,13 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
-
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
- case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
- case _ => SqlNewHadoopRDDState.unsetInputFileName()
+ case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
+ case _ => InputFileNameHolder.unsetInputFileName()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
@@ -261,7 +259,7 @@ class HadoopRDD[K, V](
finished = true
}
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -271,7 +269,7 @@ class HadoopRDD[K, V](
override def close() {
if (reader != null) {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
@@ -293,7 +291,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
+ inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
@@ -424,7 +422,7 @@ private[spark] object HadoopRDD extends Logging {
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
val out = ListBuffer[String]()
- infos.foreach { loc => {
+ infos.foreach { loc =>
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
@@ -436,7 +434,7 @@ private[spark] object HadoopRDD extends Logging {
out += new HostTaskLocation(locationStr).toString
}
}
- }}
+ }
out.seq
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
index 3f15fff793..108e9d2558 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
@@ -20,10 +20,10 @@ package org.apache.spark.rdd
import org.apache.spark.unsafe.types.UTF8String
/**
- * State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
- * TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
+ * This holds file names of the current Spark task. This is used in HadoopRDD,
+ * FileScanRDD and InputFileName function in Spark SQL.
*/
-private[spark] object SqlNewHadoopRDDState {
+private[spark] object InputFileNameHolder {
/**
* The thread variable for the name of the current file being read. This is used by
* the InputFileName function in Spark SQL.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fb9606ae38..3ccd616cbf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -189,7 +189,7 @@ class NewHadoopRDD[K, V](
}
havePair = false
if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
+ inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
@@ -220,7 +220,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
- inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 363004e587..a5992022d0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -86,12 +86,11 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
val rddToFilter: RDD[P] = self.partitioner match {
- case Some(rp: RangePartitioner[K, V]) => {
+ case Some(rp: RangePartitioner[K, V]) =>
val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match {
case (l, u) => Math.min(l, u) to Math.max(l, u)
}
PartitionPruningRDD.create(self, partitionIndicies.contains)
- }
case _ =>
self
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 296179b75b..085829af6e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
- } {
- writer.close(hadoopContext)
- }
+ }(finallyBlock = writer.close(hadoopContext))
committer.commitTask(hadoopContext)
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
@@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
- } {
- writer.close()
- }
+ }(finallyBlock = writer.close())
writer.commit()
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 582fa93afe..bb84e4af15 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -121,14 +121,14 @@ private object ParallelCollectionRDD {
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
- (0 until numSlices).iterator.map(i => {
+ (0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
- })
+ }
}
seq match {
- case r: Range => {
+ case r: Range =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
@@ -138,8 +138,7 @@ private object ParallelCollectionRDD {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
- }
- case nr: NumericRange[_] => {
+ case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
@@ -149,14 +148,12 @@ private object ParallelCollectionRDD {
r = r.drop(sliceSize)
}
slices
- }
- case _ => {
+ case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 9e3880714a..0abba15bec 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -68,9 +68,9 @@ class PartitionerAwareUnionRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
val numPartitions = partitioner.get.numPartitions
- (0 until numPartitions).map(index => {
+ (0 until numPartitions).map { index =>
new PartitionerAwareUnionRDDPartition(rdds, index)
- }).toArray
+ }.toArray
}
// Get the location where most of the partitions of parent RDDs are located
@@ -78,11 +78,10 @@ class PartitionerAwareUnionRDD[T: ClassTag](
logDebug("Finding preferred location for " + this + ", partition " + s.index)
val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
val locations = rdds.zip(parentPartitions).flatMap {
- case (rdd, part) => {
+ case (rdd, part) =>
val parentLocations = currPrefLocs(rdd, part)
logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
parentLocations
- }
}
val location = if (locations.isEmpty) {
None
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f96551c793..36ff3bcaae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -255,8 +255,8 @@ abstract class RDD[T: ClassTag](
}
/**
- * Returns the number of partitions of this RDD.
- */
+ * Returns the number of partitions of this RDD.
+ */
@Since("1.6.0")
final def getNumPartitions: Int = partitions.length
@@ -333,10 +333,10 @@ abstract class RDD[T: ClassTag](
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
- existingMetrics.incBytesReadInternal(blockResult.bytes)
+ existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
- existingMetrics.incRecordsReadInternal(1)
+ existingMetrics.incRecordsRead(1)
delegate.next()
}
}
@@ -568,11 +568,7 @@ abstract class RDD[T: ClassTag](
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = withScope {
- if (partitioner.isDefined && other.partitioner == partitioner) {
- new PartitionerAwareUnionRDD(sc, Array(this, other))
- } else {
- new UnionRDD(sc, Array(this, other))
- }
+ sc.union(this, other)
}
/**