aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-31 11:35:38 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-31 11:35:38 -0700
commitf1933123525e7c806f5fc0b0a46a78a7546f8b61 (patch)
treedbbc0d4d3c20cbb3b27707fee9dfb1b27dbb38ab
parent72cfb13987bab07461266905930f84619b3a0068 (diff)
downloadspark-f1933123525e7c806f5fc0b0a46a78a7546f8b61.tar.gz
spark-f1933123525e7c806f5fc0b0a46a78a7546f8b61.tar.bz2
spark-f1933123525e7c806f5fc0b0a46a78a7546f8b61.zip
SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD
This allows users to gain access to the InputSplit which backs each partition. An alternative solution would have been to have a .withInputSplit() method which returns a new RDD[(InputSplit, (K, V))], but this is confusing because you could not cache this RDD or shuffle it, as InputSplit is not inherently serializable. Author: Aaron Davidson <aaron@databricks.com> Closes #973 from aarondav/hadoop and squashes the following commits: 9c9112b [Aaron Davidson] Add JavaAPISuite test 9942cd7 [Aaron Davidson] Add Java API 1284a3a [Aaron Davidson] SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala34
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java26
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala34
7 files changed, 222 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
new file mode 100644
index 0000000000..0ae0b4ec04
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.mapred.InputSplit
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaSparkContext._
+import org.apache.spark.api.java.function.{Function2 => JFunction2}
+import org.apache.spark.rdd.HadoopRDD
+
+@DeveloperApi
+class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
+ (implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
+ extends JavaPairRDD[K, V](rdd) {
+
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[R](
+ f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
+ preservesPartitioning: Boolean = false): JavaRDD[R] = {
+ new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+ preservesPartitioning)(fakeClassTag))(fakeClassTag)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
new file mode 100644
index 0000000000..ec4f3964d7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.mapreduce.InputSplit
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaSparkContext._
+import org.apache.spark.api.java.function.{Function2 => JFunction2}
+import org.apache.spark.rdd.NewHadoopRDD
+
+@DeveloperApi
+class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
+ (implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
+ extends JavaPairRDD[K, V](rdd) {
+
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[R](
+ f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
+ preservesPartitioning: Boolean = false): JavaRDD[R] = {
+ new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+ preservesPartitioning)(fakeClassTag))(fakeClassTag)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8a5f8088a0..d9d1c5955c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
+ val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
+ new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
/**
@@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
+ val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
+ new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
@@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
+ val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
+ new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path,
- inputFormatClass, keyClass, valueClass))
+ val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
+ new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
/**
@@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
conf: Configuration): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
- new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
+ val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
+ new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
}
/**
@@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
vClass: Class[V]): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
- new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
+ val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
+ new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
}
/** Build the union of two or more RDDs. */
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e521612ffc..8d92ea01d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
import java.io.EOFException
+
import scala.collection.immutable.Map
+import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred.FileSplit
@@ -39,6 +41,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.NextIterator
/**
@@ -232,6 +235,14 @@ class HadoopRDD[K, V](
new InterruptibleIterator[(K, V)](context, iter)
}
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[U: ClassTag](
+ f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+ }
+
override def getPreferredLocations(split: Partition): Seq[String] = {
// TODO: Filtering out "localhost" in case of file:// URLs
val hadoopSplit = split.asInstanceOf[HadoopPartition]
@@ -272,4 +283,25 @@ private[spark] object HadoopRDD {
conf.setInt("mapred.task.partition", splitId)
conf.set("mapred.job.id", jobID.toString)
}
+
+ /**
+ * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+ * the given function rather than the index of the partition.
+ */
+ private[spark] class HadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+ prev: RDD[T],
+ f: (InputSplit, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override def compute(split: Partition, context: TaskContext) = {
+ val partition = split.asInstanceOf[HadoopPartition]
+ val inputSplit = partition.inputSplit.value
+ f(inputSplit, firstParent[T].iterator(split, context))
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f2b3a64bf1..7dfec9a18e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -32,6 +34,7 @@ import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -157,6 +160,14 @@ class NewHadoopRDD[K, V](
new InterruptibleIterator(context, iter)
}
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[U: ClassTag](
+ f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+ }
+
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[NewHadoopPartition]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
@@ -165,6 +176,29 @@ class NewHadoopRDD[K, V](
def getConf: Configuration = confBroadcast.value.value
}
+private[spark] object NewHadoopRDD {
+ /**
+ * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+ * the given function rather than the index of the partition.
+ */
+ private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+ prev: RDD[T],
+ f: (InputSplit, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override def compute(split: Partition, context: TaskContext) = {
+ val partition = split.asInstanceOf[NewHadoopPartition]
+ val inputSplit = partition.serializableHadoopSplit.value
+ f(inputSplit, firstParent[T].iterator(split, context))
+ }
+ }
+}
+
private[spark] class WholeTextFileRDD(
sc : SparkContext,
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index fab64a54e2..56150caa5d 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -25,19 +25,23 @@ import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
-
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
import org.junit.Assert;
@@ -45,6 +49,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaHadoopRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -1262,4 +1267,23 @@ public class JavaAPISuite implements Serializable {
SomeCustomClass[] collected = (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
Assert.assertEquals(data.size(), collected.length);
}
+
+ public void getHadoopInputSplits() {
+ String outDir = new File(tempDir, "output").getAbsolutePath();
+ sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2).saveAsTextFile(outDir);
+
+ JavaHadoopRDD<LongWritable, Text> hadoopRDD = (JavaHadoopRDD<LongWritable, Text>)
+ sc.hadoopFile(outDir, TextInputFormat.class, LongWritable.class, Text.class);
+ List<String> inputPaths = hadoopRDD.mapPartitionsWithInputSplit(
+ new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>>() {
+ @Override
+ public Iterator<String> call(InputSplit split, Iterator<Tuple2<LongWritable, Text>> it)
+ throws Exception {
+ FileSplit fileSplit = (FileSplit) split;
+ return Lists.newArrayList(fileSplit.getPath().toUri().getPath()).iterator();
+ }
+ }, true).collect();
+ Assert.assertEquals(Sets.newHashSet(inputPaths),
+ Sets.newHashSet(outDir + "/part-00000", outDir + "/part-00001"));
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index c70e22cf09..4a53d25012 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,12 +24,14 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
-import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
import org.apache.spark.util.Utils
class FileSuite extends FunSuite with LocalSparkContext {
@@ -318,4 +320,32 @@ class FileSuite extends FunSuite with LocalSparkContext {
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
+
+ test("Get input files via old Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val outDir = new File(tempDir, "output").getAbsolutePath
+ sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
+
+ val inputPaths =
+ sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+ .asInstanceOf[HadoopRDD[_, _]]
+ .mapPartitionsWithInputSplit { (split, part) =>
+ Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath)
+ }.collect()
+ assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
+ }
+
+ test("Get input files via new Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val outDir = new File(tempDir, "output").getAbsolutePath
+ sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
+
+ val inputPaths =
+ sc.newAPIHadoopFile(outDir, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
+ .asInstanceOf[NewHadoopRDD[_, _]]
+ .mapPartitionsWithInputSplit { (split, part) =>
+ Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath)
+ }.collect()
+ assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
+ }
}