aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-06 18:25:38 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 18:25:38 -0700
commitb87825310ac87485672868bf6a9ed01d154a3626 (patch)
treed635396f1d58541b2c9ac4f8bfbbbaeea52d79f3
parent49b1504fe3733eb36a7fc6317ec19aeba5d46f97 (diff)
downloadspark-b87825310ac87485672868bf6a9ed01d154a3626.tar.gz
spark-b87825310ac87485672868bf6a9ed01d154a3626.tar.bz2
spark-b87825310ac87485672868bf6a9ed01d154a3626.zip
[SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 and InterruptibleIterator.
A small performance optimization – we don't need to generate a Tuple2 and then immediately discard the key. We also don't need an extra wrapper from InterruptibleIterator. Author: Reynold Xin <rxin@databricks.com> Closes #8000 from rxin/SPARK-9692 and squashes the following commits: 1d4d0b3 [Reynold Xin] [SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 and InterruptibleIterator.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala3
2 files changed, 18 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 35e44cb59c..6a95e44c57 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -26,14 +26,12 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -60,18 +58,16 @@ private[spark] class SqlNewHadoopPartition(
* and the executor side to the shared Hadoop Configuration.
*
* Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
- * folded into core.
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
-private[spark] class SqlNewHadoopRDD[K, V](
+private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
+ inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
- extends RDD[(K, V)](sc, Nil)
+ extends RDD[V](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -120,8 +116,8 @@ private[spark] class SqlNewHadoopRDD[K, V](
override def compute(
theSplit: SparkPartition,
- context: TaskContext): InterruptibleIterator[(K, V)] = {
- val iter = new Iterator[(K, V)] {
+ context: TaskContext): Iterator[V] = {
+ val iter = new Iterator[V] {
val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf(isDriverSide = false)
@@ -154,17 +150,20 @@ private[spark] class SqlNewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
- private var reader = format.createRecordReader(
+ private[this] var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
- var havePair = false
- var finished = false
- var recordsSinceMetricsUpdate = 0
+
+ private[this] var havePair = false
+ private[this] var finished = false
override def hasNext: Boolean = {
+ if (context.isInterrupted) {
+ throw new TaskKilledException
+ }
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
@@ -178,7 +177,7 @@ private[spark] class SqlNewHadoopRDD[K, V](
!finished
}
- override def next(): (K, V) = {
+ override def next(): V = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
@@ -186,7 +185,7 @@ private[spark] class SqlNewHadoopRDD[K, V](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
- (reader.getCurrentKey, reader.getCurrentValue)
+ reader.getCurrentValue
}
private def close() {
@@ -212,23 +211,14 @@ private[spark] class SqlNewHadoopRDD[K, V](
}
}
} catch {
- case e: Exception => {
+ case e: Exception =>
if (!Utils.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
- }
}
}
}
- new InterruptibleIterator(context, iter)
- }
-
- /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
- @DeveloperApi
- def mapPartitionsWithInputSplit[U: ClassTag](
- f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = {
- new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+ iter
}
override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index b4337a48db..29c388c22e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -291,7 +291,6 @@ private[sql] class ParquetRelation(
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
- keyClass = classOf[Void],
valueClass = classOf[InternalRow]) {
val cacheMetadata = useMetadataCache
@@ -328,7 +327,7 @@ private[sql] class ParquetRelation(
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
- }.values.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
+ }.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
}
}