aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala116
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala17
3 files changed, 132 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8041163e3d..4b73a0352f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -137,7 +137,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
- /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -148,7 +148,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
- /**Get an RDD for a Hadoop SequenceFile. */
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+ def sequenceFile[K, V](path: String,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int,
+ cloneRecords: Boolean
+ ): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords))
+ }
+
+ /** Get an RDD for a Hadoop SequenceFile. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -156,6 +168,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
+ /** Get an RDD for a Hadoop SequenceFile. */
+ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
+ cloneRecords: Boolean):
+ JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords))
+ }
+
/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental storage
@@ -197,6 +218,37 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
+
+ /**
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
+ * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+ * using the older MapReduce API (`org.apache.hadoop.mapred`).
+ *
+ * @param conf JobConf for setting up the dataset
+ * @param inputFormatClass Class of the [[InputFormat]]
+ * @param keyClass Class of the keys
+ * @param valueClass Class of the values
+ * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
+ */
+ def hadoopRDD[K, V, F <: InputFormat[K, V]](
+ conf: JobConf,
+ inputFormatClass: Class[F],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int,
+ cloneRecords: Boolean
+ ): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits,
+ cloneRecords))
+ }
+
/**
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
@@ -231,6 +283,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
path: String,
inputFormatClass: Class[F],
keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int,
+ cloneRecords: Boolean
+ ): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass,
+ minSplits, cloneRecords))
+ }
+
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+ def hadoopFile[K, V, F <: InputFormat[K, V]](
+ path: String,
+ inputFormatClass: Class[F],
+ keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -239,6 +306,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass, keyClass, valueClass))
}
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+ def hadoopFile[K, V, F <: InputFormat[K, V]](
+ path: String,
+ inputFormatClass: Class[F],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ cloneRecords: Boolean
+ ): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+ implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(sc.hadoopFile(path,
+ inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords))
+ }
+
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
@@ -258,6 +339,22 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+ path: String,
+ fClass: Class[F],
+ kClass: Class[K],
+ vClass: Class[V],
+ conf: Configuration,
+ cloneRecords: Boolean): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
+ new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords))
+ }
+
+ /**
+ * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+ * and extra configuration options to pass to the input format.
+ */
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
fClass: Class[F],
@@ -268,6 +365,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
+ /**
+ * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+ * and extra configuration options to pass to the input format.
+ */
+ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ conf: Configuration,
+ fClass: Class[F],
+ kClass: Class[K],
+ vClass: Class[V],
+ cloneRecords: Boolean): JavaPairRDD[K, V] = {
+ implicit val kcm: ClassTag[K] = ClassTag(kClass)
+ implicit val vcm: ClassTag[V] = ClassTag(vClass)
+ new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords))
+ }
+
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07089..5ad00a1ed1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.KILLED, None)
+ handleFailedTask(tid, TaskState.FAILED, None)
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fead5..8de7a328d1 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(thrown.getMessage.contains("failed 4 times"))
}
+ test("repeatedly failing task that crashes JVM") {
+ // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
+ // than hanging due to retrying the failed task infinitely many times (eventually the
+ // standalone scheduler will remove the application, causing the job to hang waiting to
+ // reconnect to the master).
+ sc = new SparkContext(clusterUrl, "test")
+ failAfter(Span(100000, Millis)) {
+ val thrown = intercept[SparkException] {
+ // One of the tasks always fails.
+ sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ System.out.println(thrown.getMessage)
+ assert(thrown.getMessage.contains("failed 4 times"))
+ }
+ }
+
test("caching") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).cache()