aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-30 18:54:35 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-31 10:32:13 -0700
commitc61843a69fd50db66b01e9ef0fb2870baf51d351 (patch)
treefae8845457e3d44cea796cb544b07135bfa7d345 /streaming
parent98024eadc3150a9a509132117875b8d0b18b1d50 (diff)
downloadspark-c61843a69fd50db66b01e9ef0fb2870baf51d351.tar.gz
spark-c61843a69fd50db66b01e9ef0fb2870baf51d351.tar.bz2
spark-c61843a69fd50db66b01e9ef0fb2870baf51d351.zip
Changed other LZF uses to use the compression codec interface.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala26
1 files changed, 17 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 1e4c1e3742..070d930b5e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,16 +17,17 @@
package spark.streaming
-import spark.{Logging, Utils}
-
-import org.apache.hadoop.fs.{FileUtil, Path}
-import org.apache.hadoop.conf.Configuration
-
import java.io._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import spark.Logging
+import spark.io.CompressionCodec
+
+
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
@@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
+
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
@@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
+ private val compressionCodec = CompressionCodec.createCodec()
+
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
@@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
- val zos = new LZFOutputStream(bos)
+ val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
@@ -137,6 +141,8 @@ object CheckpointReader extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val compressionCodec = CompressionCodec.createCodec()
+
attempts.foreach(file => {
if (fs.exists(file)) {
logInfo("Attempting to load checkpoint from file '" + file + "'")
@@ -147,7 +153,7 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val zis = new LZFInputStream(fis)
+ val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
@@ -170,7 +176,9 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
+ extends ObjectInputStream(inputStream_) {
+
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
return loader.loadClass(desc.getName())