From d4dfab503a9222b5acf5c4bf69b91c16f298e4aa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 24 Dec 2013 14:01:13 -0800 Subject: Fixed Python API for sc.setCheckpointDir. Also other fixes based on Reynold's comments on PR 289. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala | 2 -- core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c30f896cf1..cc87febf33 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -861,12 +861,12 @@ class SparkContext( * be a HDFS path if running on a cluster. */ def setCheckpointDir(directory: String) { - checkpointDir = Option(directory).map(dir => { + checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath().toString - }) + } } /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 80385fce57..293a7d1f68 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -18,9 +18,7 @@ package org.apache.spark.rdd import java.io.IOException - import scala.reflect.ClassTag -import java.io.{IOException} import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 5a565d7e78..091a6fdb54 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -95,7 +95,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T]) rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { - throw new Exception( + throw new SparkException( "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " + "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")") } -- cgit v1.2.3