aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-02 14:52:46 -0800
committerReynold Xin <rxin@databricks.com>2015-02-02 14:52:46 -0800
commit830934976e8cf9e894bd3e5758fb941cad5d2f0b (patch)
tree436297113749017b34f9d9229fa29b0d86e18f2e /core
parent842d00032d0b09fb1f9cfc77359b77693e70a614 (diff)
downloadspark-830934976e8cf9e894bd3e5758fb941cad5d2f0b.tar.gz
spark-830934976e8cf9e894bd3e5758fb941cad5d2f0b.tar.bz2
spark-830934976e8cf9e894bd3e5758fb941cad5d2f0b.zip
SPARK-5500. Document that feeding hadoopFile into a shuffle operation wi...
...ll cause problems Author: Sandy Ryza <sandy@cloudera.com> Closes #4293 from sryza/sandy-spark-5500 and squashes the following commits: e9ce742 [Sandy Ryza] Change to warning cc46e52 [Sandy Ryza] Add instructions and extend to NewHadoopRDD 6e1932a [Sandy Ryza] Throw exception on cache 0f6c4eb [Sandy Ryza] SPARK-5500. Document that feeding hadoopFile into a shuffle operation will cause problems
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala17
3 files changed, 62 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3c61c10820..228076f01c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -687,9 +687,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def hadoopRDD[K, V](
conf: JobConf,
@@ -705,12 +706,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
- * */
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
+ */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -741,9 +743,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
@@ -764,9 +767,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
@@ -788,9 +792,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
@@ -810,9 +815,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
@@ -826,9 +832,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
@@ -843,9 +850,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
@@ -869,9 +877,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* allow it to figure out the Writable class to use in the subclass case.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index c3e3931042..89adddcf0a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -42,10 +42,11 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
+import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -308,6 +309,15 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
+ override def persist(storageLevel: StorageLevel): this.type = {
+ if (storageLevel.deserialized) {
+ logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+ " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+ " Use a map transformation to make copies of the records.")
+ }
+ super.persist(storageLevel)
+ }
+
def getConf: Configuration = getJobConf()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d86f95ac3e..44b9ffd2a5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -29,16 +29,13 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
-import org.apache.spark.InterruptibleIterator
-import org.apache.spark.Logging
-import org.apache.spark.Partition
-import org.apache.spark.SerializableWritable
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -211,6 +208,16 @@ class NewHadoopRDD[K, V](
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}
+ override def persist(storageLevel: StorageLevel): this.type = {
+ if (storageLevel.deserialized) {
+ logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+ " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+ " Use a map transformation to make copies of the records.")
+ }
+ super.persist(storageLevel)
+ }
+
+
def getConf: Configuration = confBroadcast.value.value
}