aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-16 21:07:49 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 18:41:58 -0800
commitd5570c7968baba1c1fe86c68dc1c388fae23907b (patch)
tree137af37e5886b52e5e51fc51a87714e784ef3942 /core/src/test
parent8e6cbbc6c7434b53c63e19a1c9c2dca1f24de654 (diff)
downloadspark-d5570c7968baba1c1fe86c68dc1c388fae23907b.tar.gz
spark-d5570c7968baba1c1fe86c68dc1c388fae23907b.tar.bz2
spark-d5570c7968baba1c1fe86c68dc1c388fae23907b.zip
Adding checkpointing to Java API
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java27
1 files changed, 27 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index b99e790093..0b5354774b 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -625,4 +625,31 @@ public class JavaAPISuite implements Serializable {
});
Assert.assertEquals((Float) 25.0f, floatAccum.value());
}
+
+ @Test
+ public void checkpointAndComputation() {
+ File tempDir = Files.createTempDir();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ Assert.assertEquals(false, rdd.isCheckpointed());
+ rdd.checkpoint();
+ rdd.count(); // Forces the DAG to cause a checkpoint
+ Assert.assertEquals(true, rdd.isCheckpointed());
+ Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
+ }
+
+ @Test
+ public void checkpointAndRestore() {
+ File tempDir = Files.createTempDir();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ Assert.assertEquals(false, rdd.isCheckpointed());
+ rdd.checkpoint();
+ rdd.count(); // Forces the DAG to cause a checkpoint
+ Assert.assertEquals(true, rdd.isCheckpointed());
+
+ Assert.assertTrue(rdd.getCheckpointFile().isPresent());
+ JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
+ Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
+ }
}