diff options
author | Reynold Xin <reynoldx@gmail.com> | 2013-07-30 18:54:35 -0700 |
---|---|---|
committer | Reynold Xin <reynoldx@gmail.com> | 2013-07-31 10:32:13 -0700 |
commit | c61843a69fd50db66b01e9ef0fb2870baf51d351 (patch) | |
tree | fae8845457e3d44cea796cb544b07135bfa7d345 /streaming/src/main | |
parent | 98024eadc3150a9a509132117875b8d0b18b1d50 (diff) | |
download | spark-c61843a69fd50db66b01e9ef0fb2870baf51d351.tar.gz spark-c61843a69fd50db66b01e9ef0fb2870baf51d351.tar.bz2 spark-c61843a69fd50db66b01e9ef0fb2870baf51d351.zip |
Changed other LZF uses to use the compression codec interface.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Checkpoint.scala | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 1e4c1e3742..070d930b5e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,16 +17,17 @@ package spark.streaming -import spark.{Logging, Utils} - -import org.apache.hadoop.fs.{FileUtil, Path} -import org.apache.hadoop.conf.Configuration - import java.io._ -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +import spark.Logging +import spark.io.CompressionCodec + + private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { @@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } + /** * Convenience class to speed up the writing of graph checkpoint to file */ @@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + private val compressionCodec = CompressionCodec.createCodec() + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? @@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { def write(checkpoint: Checkpoint) { val bos = new ByteArrayOutputStream() - val zos = new LZFOutputStream(bos) + val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) oos.writeObject(checkpoint) oos.close() @@ -137,6 +141,8 @@ object CheckpointReader extends Logging { val fs = new Path(path).getFileSystem(new Configuration()) val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val compressionCodec = CompressionCodec.createCodec() + attempts.foreach(file => { if (fs.exists(file)) { logInfo("Attempting to load checkpoint from file '" + file + "'") @@ -147,7 +153,7 @@ object CheckpointReader extends Logging { // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val zis = new LZFInputStream(fis) + val zis = compressionCodec.compressedInputStream(fis) val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() @@ -170,7 +176,9 @@ object CheckpointReader extends Logging { } private[streaming] -class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) { +class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) + extends ObjectInputStream(inputStream_) { + override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { return loader.loadClass(desc.getName()) |