aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala6
2 files changed, 27 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e448..f856890d27 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
+import org.apache.spark.Logging
/**
* :: DeveloperApi ::
@@ -44,25 +45,33 @@ trait CompressionCodec {
def compressedInputStream(s: InputStream): InputStream
}
-
private[spark] object CompressionCodec {
+ private val configKey = "spark.io.compression.codec"
private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
def createCodec(conf: SparkConf): CompressionCodec = {
- createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
+ createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
}
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
- val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
- .getConstructor(classOf[SparkConf])
- ctor.newInstance(conf).asInstanceOf[CompressionCodec]
+ val codec = try {
+ val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
+ .getConstructor(classOf[SparkConf])
+ Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+ } catch {
+ case e: ClassNotFoundException => None
+ case e: IllegalArgumentException => None
+ }
+ codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
+ s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}
+ val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
+ try {
+ Snappy.getNativeLibraryVersion
+ } catch {
+ case e: Error => throw new IllegalArgumentException
+ }
+
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
new SnappyOutputStream(s, blockSize)
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 25be7f25c2..8c6035fb36 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
+
+ test("bad compression codec") {
+ intercept[IllegalArgumentException] {
+ CompressionCodec.createCodec(conf, "foobar")
+ }
+ }
}