aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-11 14:01:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-11 14:01:36 -0800
commit5e9ce83d682d6198cda4631faf11cb53fcccf07f (patch)
treef88ed888af44976a4bf3479fc2cf6f5224fc4b6b /core
parent6169fe14a140146602fb07cfcd13eee6efad98f9 (diff)
downloadspark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.tar.gz
spark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.tar.bz2
spark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.zip
Fixed multiple file stream and checkpointing bugs.
- Made file stream more robust to transient failures. - Changed Spark.setCheckpointDir API to not have the second 'useExisting' parameter. Spark will always create a unique directory for checkpointing underneath the directory provide to the funtion. - Fixed bug wrt local relative paths as checkpoint directory. - Made DStream and RDD checkpointing use SparkContext.hadoopConfiguration, so that more HDFS compatible filesystems are supported for checkpointing.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java4
5 files changed, 42 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 66006bf212..1811bfa1e5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.io._
import java.net.URI
-import java.util.Properties
+import java.util.{UUID, Properties}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
@@ -857,22 +857,15 @@ class SparkContext(
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
+ * be a HDFS path if running on a cluster.
*/
- def setCheckpointDir(dir: String, useExisting: Boolean = false) {
- val path = new Path(dir)
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- if (!useExisting) {
- if (fs.exists(path)) {
- throw new Exception("Checkpoint directory '" + path + "' already exists.")
- } else {
- fs.mkdirs(path)
- }
- }
- checkpointDir = Some(dir)
+ def setCheckpointDir(directory: String) {
+ checkpointDir = Option(directory).map(dir => {
+ val path = new Path(dir, UUID.randomUUID().toString)
+ val fs = path.getFileSystem(hadoopConfiguration)
+ fs.mkdirs(path)
+ fs.getFileStatus(path).getPath().toString
+ })
}
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8869e072bf..c63db4970b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -385,20 +385,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
- */
- def setCheckpointDir(dir: String, useExisting: Boolean) {
- sc.setCheckpointDir(dir, useExisting)
- }
-
- /**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists, an exception will be thrown to prevent accidental
- * overriding of checkpoint files.
+ * be a HDFS path if running on a cluster.
*/
def setCheckpointDir(dir: String) {
sc.setCheckpointDir(dir)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index d3033ea4a6..ef4057e2a2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
+import org.apache.spark.broadcast.Broadcast
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -36,6 +37,8 @@ private[spark]
class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
+ val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
override def getPartitions: Array[Partition] = {
@@ -67,7 +70,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
- CheckpointRDD.readFromFile(file, context)
+ CheckpointRDD.readFromFile(file, broadcastedConf, context)
}
override def checkpoint() {
@@ -81,10 +84,14 @@ private[spark] object CheckpointRDD extends Logging {
"part-%05d".format(splitId)
}
- def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ def writeToFile[T](
+ path: String,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ blockSize: Int = -1
+ )(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+ val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -121,9 +128,13 @@ private[spark] object CheckpointRDD extends Logging {
}
}
- def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+ def readFromFile[T](
+ path: Path,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ context: TaskContext
+ ): Iterator[T] = {
val env = SparkEnv.get
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+ val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
@@ -146,8 +157,10 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+ val conf = SparkHadoopUtil.get.newConfiguration()
+ val fs = path.getFileSystem(conf)
+ val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+ sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 6009a41570..3160ab95c4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Partition, SparkException, Logging}
+import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
/**
@@ -83,14 +83,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+ val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration))
+ rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
+ if (newRDD.partitions.size != rdd.partitions.size) {
+ throw new Exception(
+ "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+ "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
+ }
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
@@ -99,8 +105,8 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
- logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
+ logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
// Get preferred location of a split after checkpointing
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6eac7..ee5d8c9f13 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndComputation() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
@@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndRestore() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint