aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2014-12-22 13:07:01 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-12-22 13:07:01 -0800
commit7c0ed13d298d9cf66842c667602e2dccb8f5605b (patch)
tree1bbe88dd2f6846b32b0b47ec7f7279b0655017c0 /core
parentd62da642ace17b37283eab64149545723c8474a7 (diff)
downloadspark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.tar.gz
spark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.tar.bz2
spark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.zip
[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available
This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster. Author: Kostas Sakellis <kostas@cloudera.com> Closes #3119 from ksakellis/kostas-spark-4079 and squashes the following commits: 9709c7c [Kostas Sakellis] Removed unnecessary Logging class 63bfdd0 [Kostas Sakellis] Removed isAvailable to preserve binary compatibility 1d0ef2f [Kostas Sakellis] [SPARK-4079] [CORE] Added more information to exception 64f3d27 [Kostas Sakellis] [SPARK-4079] [CORE] Code review feedback 52dfa8f [Kostas Sakellis] [SPARK-4079] [CORE] Default to LZF if Snappy not available
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala6
2 files changed, 27 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e448..f856890d27 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
+import org.apache.spark.Logging
/**
* :: DeveloperApi ::
@@ -44,25 +45,33 @@ trait CompressionCodec {
def compressedInputStream(s: InputStream): InputStream
}
-
private[spark] object CompressionCodec {
+ private val configKey = "spark.io.compression.codec"
private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
def createCodec(conf: SparkConf): CompressionCodec = {
- createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
+ createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
}
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
- val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
- .getConstructor(classOf[SparkConf])
- ctor.newInstance(conf).asInstanceOf[CompressionCodec]
+ val codec = try {
+ val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
+ .getConstructor(classOf[SparkConf])
+ Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+ } catch {
+ case e: ClassNotFoundException => None
+ case e: IllegalArgumentException => None
+ }
+ codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
+ s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}
+ val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
+ try {
+ Snappy.getNativeLibraryVersion
+ } catch {
+ case e: Error => throw new IllegalArgumentException
+ }
+
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
new SnappyOutputStream(s, blockSize)
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 25be7f25c2..8c6035fb36 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
+
+ test("bad compression codec") {
+ intercept[IllegalArgumentException] {
+ CompressionCodec.createCodec(conf, "foobar")
+ }
+ }
}