From 2fb849502f57ec29e9913907872183af06a57b3e Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Sun, 5 Dec 2010 21:20:15 -0800 Subject: Replacing the native lzf compression code with the ning open-source compress-lzf library. (Apache 2.0 liscense) --- Makefile | 18 +-- run | 1 + src/java/spark/compress/lzf/LZF.java | 27 ---- src/java/spark/compress/lzf/LZFInputStream.java | 180 --------------------- src/java/spark/compress/lzf/LZFOutputStream.java | 85 ---------- src/native/Makefile | 30 ---- src/native/spark_compress_lzf_LZF.c | 90 ----------- src/scala/spark/Broadcast.scala | 2 +- .../compress-lzf-0.6.0/compress-lzf-0.6.0.jar | Bin 0 -> 14497 bytes 9 files changed, 8 insertions(+), 425 deletions(-) delete mode 100644 src/java/spark/compress/lzf/LZF.java delete mode 100644 src/java/spark/compress/lzf/LZFInputStream.java delete mode 100644 src/java/spark/compress/lzf/LZFOutputStream.java delete mode 100644 src/native/Makefile delete mode 100644 src/native/spark_compress_lzf_LZF.c create mode 100644 third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar diff --git a/Makefile b/Makefile index 15ab516d1f..762d752fda 100644 --- a/Makefile +++ b/Makefile @@ -15,12 +15,13 @@ JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar +JARS += third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar + CLASSPATH = $(subst $(SPACE),:,$(JARS)) SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala SCALA_SOURCES += src/test/spark/*.scala src/test/spark/repl/*.scala -JAVA_SOURCES = $(wildcard src/java/spark/compress/lzf/*.java) ifeq ($(USE_FSC),1) COMPILER_NAME = fsc @@ -36,25 +37,19 @@ endif CONF_FILES = conf/spark-env.sh conf/log4j.properties conf/java-opts -all: scala java conf-files +all: scala conf-files build/classes: mkdir -p build/classes -scala: build/classes java +scala: build/classes $(COMPILER) -d build/classes -classpath build/classes:$(CLASSPATH) $(SCALA_SOURCES) -java: $(JAVA_SOURCES) build/classes - javac -d build/classes $(JAVA_SOURCES) - -native: java - $(MAKE) -C src/native - jar: build/spark.jar build/spark-dep.jar dep-jar: build/spark-dep.jar -build/spark.jar: scala java +build/spark.jar: scala jar cf build/spark.jar -C build/classes spark build/spark-dep.jar: @@ -73,7 +68,6 @@ test: all default: all clean: - $(MAKE) -C src/native clean rm -rf build -.phony: default all clean scala java native jar dep-jar conf-files +.phony: default all clean scala jar dep-jar conf-files diff --git a/run b/run index d6f7d920c5..5c8943c91b 100755 --- a/run +++ b/run @@ -48,6 +48,7 @@ CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar +CLASSPATH+=:$FWDIR/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do CLASSPATH+=:$jar done diff --git a/src/java/spark/compress/lzf/LZF.java b/src/java/spark/compress/lzf/LZF.java deleted file mode 100644 index 294a0494ec..0000000000 --- a/src/java/spark/compress/lzf/LZF.java +++ /dev/null @@ -1,27 +0,0 @@ -package spark.compress.lzf; - -public class LZF { - private static boolean loaded; - - static { - try { - System.loadLibrary("spark_native"); - loaded = true; - } catch(Throwable t) { - System.out.println("Failed to load native LZF library: " + t.toString()); - loaded = false; - } - } - - public static boolean isLoaded() { - return loaded; - } - - public static native int compress( - byte[] in, int inOff, int inLen, - byte[] out, int outOff, int outLen); - - public static native int decompress( - byte[] in, int inOff, int inLen, - byte[] out, int outOff, int outLen); -} diff --git a/src/java/spark/compress/lzf/LZFInputStream.java b/src/java/spark/compress/lzf/LZFInputStream.java deleted file mode 100644 index 16bc687489..0000000000 --- a/src/java/spark/compress/lzf/LZFInputStream.java +++ /dev/null @@ -1,180 +0,0 @@ -package spark.compress.lzf; - -import java.io.EOFException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -public class LZFInputStream extends FilterInputStream { - private static final int MAX_BLOCKSIZE = 1024 * 64 - 1; - private static final int MAX_HDR_SIZE = 7; - - private byte[] inBuf; // Holds data to decompress (including header) - private byte[] outBuf; // Holds decompressed data to output - private int outPos; // Current position in outBuf - private int outSize; // Total amount of data in outBuf - - private boolean closed; - private boolean reachedEof; - - private byte[] singleByte = new byte[1]; - - public LZFInputStream(InputStream in) { - super(in); - if (in == null) - throw new NullPointerException(); - inBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE]; - outBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE]; - outPos = 0; - outSize = 0; - } - - private void ensureOpen() throws IOException { - if (closed) throw new IOException("Stream closed"); - } - - @Override - public int read() throws IOException { - ensureOpen(); - int count = read(singleByte, 0, 1); - return (count == -1 ? -1 : singleByte[0] & 0xFF); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - ensureOpen(); - if ((off | len | (off + len) | (b.length - (off + len))) < 0) - throw new IndexOutOfBoundsException(); - - int totalRead = 0; - - // Start with the current block in outBuf, and read and decompress any - // further blocks necessary. Instead of trying to decompress directly to b - // when b is large, we always use outBuf as an intermediate holding space - // in case GetPrimitiveArrayCritical decides to copy arrays instead of - // pinning them, which would cause b to be copied repeatedly into C-land. - while (len > 0) { - if (outPos == outSize) { - readNextBlock(); - if (reachedEof) - return totalRead == 0 ? -1 : totalRead; - } - int amtToCopy = Math.min(outSize - outPos, len); - System.arraycopy(outBuf, outPos, b, off, amtToCopy); - off += amtToCopy; - len -= amtToCopy; - outPos += amtToCopy; - totalRead += amtToCopy; - } - - return totalRead; - } - - // Read len bytes from this.in to a buffer, stopping only if EOF is reached - private int readFully(byte[] b, int off, int len) throws IOException { - int totalRead = 0; - while (len > 0) { - int amt = in.read(b, off, len); - if (amt == -1) - break; - off += amt; - len -= amt; - totalRead += amt; - } - return totalRead; - } - - // Read the next block from the underlying InputStream into outBuf, - // setting outPos and outSize, or set reachedEof if the stream ends. - private void readNextBlock() throws IOException { - // Read first 5 bytes of header - int count = readFully(inBuf, 0, 5); - if (count == 0) { - reachedEof = true; - return; - } else if (count < 5) { - throw new EOFException("Truncated LZF block header"); - } - - // Check magic bytes - if (inBuf[0] != 'Z' || inBuf[1] != 'V') - throw new IOException("Wrong magic bytes in LZF block header"); - - // Read the block - if (inBuf[2] == 0) { - // Uncompressed block - read directly to outBuf - int size = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF); - if (readFully(outBuf, 0, size) != size) - throw new EOFException("EOF inside LZF block"); - outPos = 0; - outSize = size; - } else if (inBuf[2] == 1) { - // Compressed block - read to inBuf and decompress - if (readFully(inBuf, 5, 2) != 2) - throw new EOFException("Truncated LZF block header"); - int csize = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF); - int usize = ((inBuf[5] & 0xFF) << 8) | (inBuf[6] & 0xFF); - if (readFully(inBuf, 7, csize) != csize) - throw new EOFException("Truncated LZF block"); - if (LZF.decompress(inBuf, 7, csize, outBuf, 0, usize) != usize) - throw new IOException("Corrupt LZF data stream"); - outPos = 0; - outSize = usize; - } else { - throw new IOException("Unknown block type in LZF block header"); - } - } - - /** - * Returns 0 after EOF has been reached, otherwise always return 1. - * - * Programs should not count on this method to return the actual number - * of bytes that could be read without blocking. - */ - @Override - public int available() throws IOException { - ensureOpen(); - return reachedEof ? 0 : 1; - } - - // TODO: Skip complete chunks without decompressing them? - @Override - public long skip(long n) throws IOException { - ensureOpen(); - if (n < 0) - throw new IllegalArgumentException("negative skip length"); - byte[] buf = new byte[512]; - long skipped = 0; - while (skipped < n) { - int len = (int) Math.min(n - skipped, buf.length); - len = read(buf, 0, len); - if (len == -1) { - reachedEof = true; - break; - } - skipped += len; - } - return skipped; - } - - @Override - public void close() throws IOException { - if (!closed) { - in.close(); - closed = true; - } - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public void mark(int readLimit) {} - - @Override - public void reset() throws IOException { - throw new IOException("mark/reset not supported"); - } -} diff --git a/src/java/spark/compress/lzf/LZFOutputStream.java b/src/java/spark/compress/lzf/LZFOutputStream.java deleted file mode 100644 index 5f65e95d2a..0000000000 --- a/src/java/spark/compress/lzf/LZFOutputStream.java +++ /dev/null @@ -1,85 +0,0 @@ -package spark.compress.lzf; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -public class LZFOutputStream extends FilterOutputStream { - private static final int BLOCKSIZE = 1024 * 64 - 1; - private static final int MAX_HDR_SIZE = 7; - - private byte[] inBuf; // Holds input data to be compressed - private byte[] outBuf; // Holds compressed data to be written - private int inPos; // Current position in inBuf - - public LZFOutputStream(OutputStream out) { - super(out); - inBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE]; - outBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE]; - inPos = MAX_HDR_SIZE; - } - - @Override - public void write(int b) throws IOException { - inBuf[inPos++] = (byte) b; - if (inPos == inBuf.length) - compressAndSendBlock(); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if ((off | len | (off + len) | (b.length - (off + len))) < 0) - throw new IndexOutOfBoundsException(); - - // If we're given a large array, copy it piece by piece into inBuf and - // write one BLOCKSIZE at a time. This is done to prevent the JNI code - // from copying the whole array repeatedly if GetPrimitiveArrayCritical - // decides to copy instead of pinning. - while (inPos + len >= inBuf.length) { - int amtToCopy = inBuf.length - inPos; - System.arraycopy(b, off, inBuf, inPos, amtToCopy); - inPos += amtToCopy; - compressAndSendBlock(); - off += amtToCopy; - len -= amtToCopy; - } - - // Copy the remaining (incomplete) block into inBuf - System.arraycopy(b, off, inBuf, inPos, len); - inPos += len; - } - - @Override - public void flush() throws IOException { - if (inPos > MAX_HDR_SIZE) - compressAndSendBlock(); - out.flush(); - } - - // Send the data in inBuf, and reset inPos to start writing a new block. - private void compressAndSendBlock() throws IOException { - int us = inPos - MAX_HDR_SIZE; - int maxcs = us > 4 ? us - 4 : us; - int cs = LZF.compress(inBuf, MAX_HDR_SIZE, us, outBuf, MAX_HDR_SIZE, maxcs); - if (cs != 0) { - // Compression made the data smaller; use type 1 header - outBuf[0] = 'Z'; - outBuf[1] = 'V'; - outBuf[2] = 1; - outBuf[3] = (byte) (cs >> 8); - outBuf[4] = (byte) (cs & 0xFF); - outBuf[5] = (byte) (us >> 8); - outBuf[6] = (byte) (us & 0xFF); - out.write(outBuf, 0, 7 + cs); - } else { - // Compression didn't help; use type 0 header and uncompressed data - inBuf[2] = 'Z'; - inBuf[3] = 'V'; - inBuf[4] = 0; - inBuf[5] = (byte) (us >> 8); - inBuf[6] = (byte) (us & 0xFF); - out.write(inBuf, 2, 5 + us); - } - inPos = MAX_HDR_SIZE; - } -} diff --git a/src/native/Makefile b/src/native/Makefile deleted file mode 100644 index 6236e26f3d..0000000000 --- a/src/native/Makefile +++ /dev/null @@ -1,30 +0,0 @@ -CC = gcc -#JAVA_HOME = /usr/lib/jvm/java-6-sun -OS_NAME = linux - -CFLAGS = -fPIC -O3 -funroll-all-loops - -SPARK = ../.. - -LZF = $(SPARK)/third_party/liblzf-3.5 - -LIB = libspark_native.so - -all: $(LIB) - -spark_compress_lzf_LZF.h: $(SPARK)/build/classes/spark/compress/lzf/LZF.class -ifeq ($(JAVA_HOME),) - $(error JAVA_HOME is not set) -else - $(JAVA_HOME)/bin/javah -classpath $(SPARK)/build/classes spark.compress.lzf.LZF -endif - -$(LIB): spark_compress_lzf_LZF.h spark_compress_lzf_LZF.c - $(CC) $(CFLAGS) -shared -o $@ spark_compress_lzf_LZF.c \ - -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/$(OS_NAME) \ - -I $(LZF) $(LZF)/lzf_c.c $(LZF)/lzf_d.c - -clean: - rm -f spark_compress_lzf_LZF.h $(LIB) - -.PHONY: all clean diff --git a/src/native/spark_compress_lzf_LZF.c b/src/native/spark_compress_lzf_LZF.c deleted file mode 100644 index c2a59def3e..0000000000 --- a/src/native/spark_compress_lzf_LZF.c +++ /dev/null @@ -1,90 +0,0 @@ -#include "spark_compress_lzf_LZF.h" -#include - - -/* Helper function to throw an exception */ -static void throwException(JNIEnv *env, const char* className) { - jclass cls = (*env)->FindClass(env, className); - if (cls != 0) /* If cls is null, an exception was already thrown */ - (*env)->ThrowNew(env, cls, ""); -} - - -/* - * Since LZF.compress() and LZF.decompress() have the same signatures - * and differ only in which lzf_ function they call, implement both in a - * single function and pass it a pointer to the correct lzf_ function. - */ -static jint callCompressionFunction - (unsigned int (*func)(const void *const, unsigned int, void *, unsigned int), - JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen, - jbyteArray outArray, jint outOff, jint outLen) -{ - jint inCap; - jint outCap; - jbyte *inData = 0; - jbyte *outData = 0; - jint ret; - jint s; - - if (!inArray || !outArray) { - throwException(env, "java/lang/NullPointerException"); - goto cleanup; - } - - inCap = (*env)->GetArrayLength(env, inArray); - outCap = (*env)->GetArrayLength(env, outArray); - - // Check if any of the offset/length pairs is invalid; we do this by OR'ing - // things we don't want to be negative and seeing if the result is negative - s = inOff | inLen | (inOff + inLen) | (inCap - (inOff + inLen)) | - outOff | outLen | (outOff + outLen) | (outCap - (outOff + outLen)); - if (s < 0) { - throwException(env, "java/lang/IndexOutOfBoundsException"); - goto cleanup; - } - - inData = (*env)->GetPrimitiveArrayCritical(env, inArray, 0); - outData = (*env)->GetPrimitiveArrayCritical(env, outArray, 0); - - if (!inData || !outData) { - // Out of memory - JVM will throw OutOfMemoryError - goto cleanup; - } - - ret = func(inData + inOff, inLen, outData + outOff, outLen); - -cleanup: - if (inData) - (*env)->ReleasePrimitiveArrayCritical(env, inArray, inData, 0); - if (outData) - (*env)->ReleasePrimitiveArrayCritical(env, outArray, outData, 0); - - return ret; -} - -/* - * Class: spark_compress_lzf_LZF - * Method: compress - * Signature: ([B[B)I - */ -JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_compress - (JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen, - jbyteArray outArray, jint outOff, jint outLen) -{ - return callCompressionFunction(lzf_compress, env, cls, - inArray, inOff, inLen, outArray,outOff, outLen); -} - -/* - * Class: spark_compress_lzf_LZF - * Method: decompress - * Signature: ([B[B)I - */ -JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_decompress - (JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen, - jbyteArray outArray, jint outOff, jint outLen) -{ - return callCompressionFunction(lzf_decompress, env, cls, - inArray, inOff, inLen, outArray,outOff, outLen); -} diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 5089dca82e..2fdd960bdc 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -14,7 +14,7 @@ import scala.collection.mutable.Map import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} -import spark.compress.lzf.{LZFInputStream, LZFOutputStream} +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} @serializable trait BroadcastRecipe { diff --git a/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar b/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar new file mode 100644 index 0000000000..6cb5c4c92b Binary files /dev/null and b/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar differ -- cgit v1.2.3