aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 293a7d1f68..6d4f46125f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -76,7 +76,6 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
}
private[spark] object CheckpointRDD extends Logging {
-
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}
@@ -98,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -132,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging {
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)