aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-12-15 10:57:39 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-12-15 10:57:39 -0800
commit5c222dbe284664af3547d1d9d67d4846bf36f4ed (patch)
tree3c7a31c59aeeadbb257119a85190f0ea150af3af
parent0a5c24ae3d526bba2cc11c0c7a1a5efa5fd9a39c (diff)
parent34395730dbae0913674212b0d8c981db122d1643 (diff)
downloadspark-5c222dbe284664af3547d1d9d67d4846bf36f4ed.tar.gz
spark-5c222dbe284664af3547d1d9d67d4846bf36f4ed.tar.bz2
spark-5c222dbe284664af3547d1d9d67d4846bf36f4ed.zip
Merge branch 'master' into mos-bt
Conflicts: src/scala/spark/Broadcast.scala
-rw-r--r--LICENSE27
-rw-r--r--Makefile18
-rwxr-xr-xrun1
-rw-r--r--src/java/spark/compress/lzf/LZF.java27
-rw-r--r--src/java/spark/compress/lzf/LZFInputStream.java180
-rw-r--r--src/java/spark/compress/lzf/LZFOutputStream.java85
-rw-r--r--src/native/Makefile30
-rw-r--r--src/native/spark_compress_lzf_LZF.c90
-rw-r--r--src/scala/spark/Broadcast.scala17
-rw-r--r--src/scala/spark/MesosScheduler.scala2
-rw-r--r--third_party/compress-lzf-0.6.0/LICENSE11
-rw-r--r--third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jarbin0 -> 14497 bytes
12 files changed, 61 insertions, 427 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000000..d17afa1fc6
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2010, Regents of the University of California.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of the University of California, Berkeley nor the
+ names of its contributors may be used to endorse or promote
+ products derived from this software without specific prior written
+ permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
index 15ab516d1f..762d752fda 100644
--- a/Makefile
+++ b/Makefile
@@ -15,12 +15,13 @@ JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
+JARS += third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
+
CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
SCALA_SOURCES += src/test/spark/*.scala src/test/spark/repl/*.scala
-JAVA_SOURCES = $(wildcard src/java/spark/compress/lzf/*.java)
ifeq ($(USE_FSC),1)
COMPILER_NAME = fsc
@@ -36,25 +37,19 @@ endif
CONF_FILES = conf/spark-env.sh conf/log4j.properties conf/java-opts
-all: scala java conf-files
+all: scala conf-files
build/classes:
mkdir -p build/classes
-scala: build/classes java
+scala: build/classes
$(COMPILER) -d build/classes -classpath build/classes:$(CLASSPATH) $(SCALA_SOURCES)
-java: $(JAVA_SOURCES) build/classes
- javac -d build/classes $(JAVA_SOURCES)
-
-native: java
- $(MAKE) -C src/native
-
jar: build/spark.jar build/spark-dep.jar
dep-jar: build/spark-dep.jar
-build/spark.jar: scala java
+build/spark.jar: scala
jar cf build/spark.jar -C build/classes spark
build/spark-dep.jar:
@@ -73,7 +68,6 @@ test: all
default: all
clean:
- $(MAKE) -C src/native clean
rm -rf build
-.phony: default all clean scala java native jar dep-jar conf-files
+.phony: default all clean scala jar dep-jar conf-files
diff --git a/run b/run
index d6f7d920c5..5c8943c91b 100755
--- a/run
+++ b/run
@@ -48,6 +48,7 @@ CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
+CLASSPATH+=:$FWDIR/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
CLASSPATH+=:$jar
done
diff --git a/src/java/spark/compress/lzf/LZF.java b/src/java/spark/compress/lzf/LZF.java
deleted file mode 100644
index 294a0494ec..0000000000
--- a/src/java/spark/compress/lzf/LZF.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package spark.compress.lzf;
-
-public class LZF {
- private static boolean loaded;
-
- static {
- try {
- System.loadLibrary("spark_native");
- loaded = true;
- } catch(Throwable t) {
- System.out.println("Failed to load native LZF library: " + t.toString());
- loaded = false;
- }
- }
-
- public static boolean isLoaded() {
- return loaded;
- }
-
- public static native int compress(
- byte[] in, int inOff, int inLen,
- byte[] out, int outOff, int outLen);
-
- public static native int decompress(
- byte[] in, int inOff, int inLen,
- byte[] out, int outOff, int outLen);
-}
diff --git a/src/java/spark/compress/lzf/LZFInputStream.java b/src/java/spark/compress/lzf/LZFInputStream.java
deleted file mode 100644
index 16bc687489..0000000000
--- a/src/java/spark/compress/lzf/LZFInputStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package spark.compress.lzf;
-
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class LZFInputStream extends FilterInputStream {
- private static final int MAX_BLOCKSIZE = 1024 * 64 - 1;
- private static final int MAX_HDR_SIZE = 7;
-
- private byte[] inBuf; // Holds data to decompress (including header)
- private byte[] outBuf; // Holds decompressed data to output
- private int outPos; // Current position in outBuf
- private int outSize; // Total amount of data in outBuf
-
- private boolean closed;
- private boolean reachedEof;
-
- private byte[] singleByte = new byte[1];
-
- public LZFInputStream(InputStream in) {
- super(in);
- if (in == null)
- throw new NullPointerException();
- inBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE];
- outBuf = new byte[MAX_BLOCKSIZE + MAX_HDR_SIZE];
- outPos = 0;
- outSize = 0;
- }
-
- private void ensureOpen() throws IOException {
- if (closed) throw new IOException("Stream closed");
- }
-
- @Override
- public int read() throws IOException {
- ensureOpen();
- int count = read(singleByte, 0, 1);
- return (count == -1 ? -1 : singleByte[0] & 0xFF);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- ensureOpen();
- if ((off | len | (off + len) | (b.length - (off + len))) < 0)
- throw new IndexOutOfBoundsException();
-
- int totalRead = 0;
-
- // Start with the current block in outBuf, and read and decompress any
- // further blocks necessary. Instead of trying to decompress directly to b
- // when b is large, we always use outBuf as an intermediate holding space
- // in case GetPrimitiveArrayCritical decides to copy arrays instead of
- // pinning them, which would cause b to be copied repeatedly into C-land.
- while (len > 0) {
- if (outPos == outSize) {
- readNextBlock();
- if (reachedEof)
- return totalRead == 0 ? -1 : totalRead;
- }
- int amtToCopy = Math.min(outSize - outPos, len);
- System.arraycopy(outBuf, outPos, b, off, amtToCopy);
- off += amtToCopy;
- len -= amtToCopy;
- outPos += amtToCopy;
- totalRead += amtToCopy;
- }
-
- return totalRead;
- }
-
- // Read len bytes from this.in to a buffer, stopping only if EOF is reached
- private int readFully(byte[] b, int off, int len) throws IOException {
- int totalRead = 0;
- while (len > 0) {
- int amt = in.read(b, off, len);
- if (amt == -1)
- break;
- off += amt;
- len -= amt;
- totalRead += amt;
- }
- return totalRead;
- }
-
- // Read the next block from the underlying InputStream into outBuf,
- // setting outPos and outSize, or set reachedEof if the stream ends.
- private void readNextBlock() throws IOException {
- // Read first 5 bytes of header
- int count = readFully(inBuf, 0, 5);
- if (count == 0) {
- reachedEof = true;
- return;
- } else if (count < 5) {
- throw new EOFException("Truncated LZF block header");
- }
-
- // Check magic bytes
- if (inBuf[0] != 'Z' || inBuf[1] != 'V')
- throw new IOException("Wrong magic bytes in LZF block header");
-
- // Read the block
- if (inBuf[2] == 0) {
- // Uncompressed block - read directly to outBuf
- int size = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF);
- if (readFully(outBuf, 0, size) != size)
- throw new EOFException("EOF inside LZF block");
- outPos = 0;
- outSize = size;
- } else if (inBuf[2] == 1) {
- // Compressed block - read to inBuf and decompress
- if (readFully(inBuf, 5, 2) != 2)
- throw new EOFException("Truncated LZF block header");
- int csize = ((inBuf[3] & 0xFF) << 8) | (inBuf[4] & 0xFF);
- int usize = ((inBuf[5] & 0xFF) << 8) | (inBuf[6] & 0xFF);
- if (readFully(inBuf, 7, csize) != csize)
- throw new EOFException("Truncated LZF block");
- if (LZF.decompress(inBuf, 7, csize, outBuf, 0, usize) != usize)
- throw new IOException("Corrupt LZF data stream");
- outPos = 0;
- outSize = usize;
- } else {
- throw new IOException("Unknown block type in LZF block header");
- }
- }
-
- /**
- * Returns 0 after EOF has been reached, otherwise always return 1.
- *
- * Programs should not count on this method to return the actual number
- * of bytes that could be read without blocking.
- */
- @Override
- public int available() throws IOException {
- ensureOpen();
- return reachedEof ? 0 : 1;
- }
-
- // TODO: Skip complete chunks without decompressing them?
- @Override
- public long skip(long n) throws IOException {
- ensureOpen();
- if (n < 0)
- throw new IllegalArgumentException("negative skip length");
- byte[] buf = new byte[512];
- long skipped = 0;
- while (skipped < n) {
- int len = (int) Math.min(n - skipped, buf.length);
- len = read(buf, 0, len);
- if (len == -1) {
- reachedEof = true;
- break;
- }
- skipped += len;
- }
- return skipped;
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- in.close();
- closed = true;
- }
- }
-
- @Override
- public boolean markSupported() {
- return false;
- }
-
- @Override
- public void mark(int readLimit) {}
-
- @Override
- public void reset() throws IOException {
- throw new IOException("mark/reset not supported");
- }
-}
diff --git a/src/java/spark/compress/lzf/LZFOutputStream.java b/src/java/spark/compress/lzf/LZFOutputStream.java
deleted file mode 100644
index 5f65e95d2a..0000000000
--- a/src/java/spark/compress/lzf/LZFOutputStream.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package spark.compress.lzf;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class LZFOutputStream extends FilterOutputStream {
- private static final int BLOCKSIZE = 1024 * 64 - 1;
- private static final int MAX_HDR_SIZE = 7;
-
- private byte[] inBuf; // Holds input data to be compressed
- private byte[] outBuf; // Holds compressed data to be written
- private int inPos; // Current position in inBuf
-
- public LZFOutputStream(OutputStream out) {
- super(out);
- inBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE];
- outBuf = new byte[BLOCKSIZE + MAX_HDR_SIZE];
- inPos = MAX_HDR_SIZE;
- }
-
- @Override
- public void write(int b) throws IOException {
- inBuf[inPos++] = (byte) b;
- if (inPos == inBuf.length)
- compressAndSendBlock();
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- if ((off | len | (off + len) | (b.length - (off + len))) < 0)
- throw new IndexOutOfBoundsException();
-
- // If we're given a large array, copy it piece by piece into inBuf and
- // write one BLOCKSIZE at a time. This is done to prevent the JNI code
- // from copying the whole array repeatedly if GetPrimitiveArrayCritical
- // decides to copy instead of pinning.
- while (inPos + len >= inBuf.length) {
- int amtToCopy = inBuf.length - inPos;
- System.arraycopy(b, off, inBuf, inPos, amtToCopy);
- inPos += amtToCopy;
- compressAndSendBlock();
- off += amtToCopy;
- len -= amtToCopy;
- }
-
- // Copy the remaining (incomplete) block into inBuf
- System.arraycopy(b, off, inBuf, inPos, len);
- inPos += len;
- }
-
- @Override
- public void flush() throws IOException {
- if (inPos > MAX_HDR_SIZE)
- compressAndSendBlock();
- out.flush();
- }
-
- // Send the data in inBuf, and reset inPos to start writing a new block.
- private void compressAndSendBlock() throws IOException {
- int us = inPos - MAX_HDR_SIZE;
- int maxcs = us > 4 ? us - 4 : us;
- int cs = LZF.compress(inBuf, MAX_HDR_SIZE, us, outBuf, MAX_HDR_SIZE, maxcs);
- if (cs != 0) {
- // Compression made the data smaller; use type 1 header
- outBuf[0] = 'Z';
- outBuf[1] = 'V';
- outBuf[2] = 1;
- outBuf[3] = (byte) (cs >> 8);
- outBuf[4] = (byte) (cs & 0xFF);
- outBuf[5] = (byte) (us >> 8);
- outBuf[6] = (byte) (us & 0xFF);
- out.write(outBuf, 0, 7 + cs);
- } else {
- // Compression didn't help; use type 0 header and uncompressed data
- inBuf[2] = 'Z';
- inBuf[3] = 'V';
- inBuf[4] = 0;
- inBuf[5] = (byte) (us >> 8);
- inBuf[6] = (byte) (us & 0xFF);
- out.write(inBuf, 2, 5 + us);
- }
- inPos = MAX_HDR_SIZE;
- }
-}
diff --git a/src/native/Makefile b/src/native/Makefile
deleted file mode 100644
index 6236e26f3d..0000000000
--- a/src/native/Makefile
+++ /dev/null
@@ -1,30 +0,0 @@
-CC = gcc
-#JAVA_HOME = /usr/lib/jvm/java-6-sun
-OS_NAME = linux
-
-CFLAGS = -fPIC -O3 -funroll-all-loops
-
-SPARK = ../..
-
-LZF = $(SPARK)/third_party/liblzf-3.5
-
-LIB = libspark_native.so
-
-all: $(LIB)
-
-spark_compress_lzf_LZF.h: $(SPARK)/build/classes/spark/compress/lzf/LZF.class
-ifeq ($(JAVA_HOME),)
- $(error JAVA_HOME is not set)
-else
- $(JAVA_HOME)/bin/javah -classpath $(SPARK)/build/classes spark.compress.lzf.LZF
-endif
-
-$(LIB): spark_compress_lzf_LZF.h spark_compress_lzf_LZF.c
- $(CC) $(CFLAGS) -shared -o $@ spark_compress_lzf_LZF.c \
- -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/$(OS_NAME) \
- -I $(LZF) $(LZF)/lzf_c.c $(LZF)/lzf_d.c
-
-clean:
- rm -f spark_compress_lzf_LZF.h $(LIB)
-
-.PHONY: all clean
diff --git a/src/native/spark_compress_lzf_LZF.c b/src/native/spark_compress_lzf_LZF.c
deleted file mode 100644
index c2a59def3e..0000000000
--- a/src/native/spark_compress_lzf_LZF.c
+++ /dev/null
@@ -1,90 +0,0 @@
-#include "spark_compress_lzf_LZF.h"
-#include <lzf.h>
-
-
-/* Helper function to throw an exception */
-static void throwException(JNIEnv *env, const char* className) {
- jclass cls = (*env)->FindClass(env, className);
- if (cls != 0) /* If cls is null, an exception was already thrown */
- (*env)->ThrowNew(env, cls, "");
-}
-
-
-/*
- * Since LZF.compress() and LZF.decompress() have the same signatures
- * and differ only in which lzf_ function they call, implement both in a
- * single function and pass it a pointer to the correct lzf_ function.
- */
-static jint callCompressionFunction
- (unsigned int (*func)(const void *const, unsigned int, void *, unsigned int),
- JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
- jbyteArray outArray, jint outOff, jint outLen)
-{
- jint inCap;
- jint outCap;
- jbyte *inData = 0;
- jbyte *outData = 0;
- jint ret;
- jint s;
-
- if (!inArray || !outArray) {
- throwException(env, "java/lang/NullPointerException");
- goto cleanup;
- }
-
- inCap = (*env)->GetArrayLength(env, inArray);
- outCap = (*env)->GetArrayLength(env, outArray);
-
- // Check if any of the offset/length pairs is invalid; we do this by OR'ing
- // things we don't want to be negative and seeing if the result is negative
- s = inOff | inLen | (inOff + inLen) | (inCap - (inOff + inLen)) |
- outOff | outLen | (outOff + outLen) | (outCap - (outOff + outLen));
- if (s < 0) {
- throwException(env, "java/lang/IndexOutOfBoundsException");
- goto cleanup;
- }
-
- inData = (*env)->GetPrimitiveArrayCritical(env, inArray, 0);
- outData = (*env)->GetPrimitiveArrayCritical(env, outArray, 0);
-
- if (!inData || !outData) {
- // Out of memory - JVM will throw OutOfMemoryError
- goto cleanup;
- }
-
- ret = func(inData + inOff, inLen, outData + outOff, outLen);
-
-cleanup:
- if (inData)
- (*env)->ReleasePrimitiveArrayCritical(env, inArray, inData, 0);
- if (outData)
- (*env)->ReleasePrimitiveArrayCritical(env, outArray, outData, 0);
-
- return ret;
-}
-
-/*
- * Class: spark_compress_lzf_LZF
- * Method: compress
- * Signature: ([B[B)I
- */
-JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_compress
- (JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
- jbyteArray outArray, jint outOff, jint outLen)
-{
- return callCompressionFunction(lzf_compress, env, cls,
- inArray, inOff, inLen, outArray,outOff, outLen);
-}
-
-/*
- * Class: spark_compress_lzf_LZF
- * Method: decompress
- * Signature: ([B[B)I
- */
-JNIEXPORT jint JNICALL Java_spark_compress_lzf_LZF_decompress
- (JNIEnv *env, jclass cls, jbyteArray inArray, jint inOff, jint inLen,
- jbyteArray outArray, jint outOff, jint outLen)
-{
- return callCompressionFunction(lzf_decompress, env, cls,
- inArray, inOff, inLen, outArray,outOff, outLen);
-}
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 08a718540e..17a530e49f 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -1,7 +1,20 @@
package spark
-import java.util.{BitSet, UUID}
-import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
+import java.io._
+import java.net._
+import java.util.{BitSet, UUID, PriorityQueue, Comparator}
+
+import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
+
+import scala.actors.Actor
+import scala.actors.Actor._
+
+import scala.collection.mutable.Map
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
@serializable
trait Broadcast[T] {
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index c45eff64d4..6a592d13c3 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -98,7 +98,7 @@ extends MScheduler with spark.Scheduler with Logging
params("env." + key) = System.getenv(key)
}
}
- new ExecutorInfo(execScript, createExecArg())
+ new ExecutorInfo(execScript, createExecArg(), params)
}
/**
diff --git a/third_party/compress-lzf-0.6.0/LICENSE b/third_party/compress-lzf-0.6.0/LICENSE
new file mode 100644
index 0000000000..c5da4e1348
--- /dev/null
+++ b/third_party/compress-lzf-0.6.0/LICENSE
@@ -0,0 +1,11 @@
+Copyright 2009-2010 Ning, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
diff --git a/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar b/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
new file mode 100644
index 0000000000..6cb5c4c92b
--- /dev/null
+++ b/third_party/compress-lzf-0.6.0/compress-lzf-0.6.0.jar
Binary files differ