diff options
author | Kostas Sakellis <kostas@cloudera.com> | 2014-12-22 13:07:01 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-12-22 13:07:01 -0800 |
commit | 7c0ed13d298d9cf66842c667602e2dccb8f5605b (patch) | |
tree | 1bbe88dd2f6846b32b0b47ec7f7279b0655017c0 /core | |
parent | d62da642ace17b37283eab64149545723c8474a7 (diff) | |
download | spark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.tar.gz spark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.tar.bz2 spark-7c0ed13d298d9cf66842c667602e2dccb8f5605b.zip |
[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available
This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster.
Author: Kostas Sakellis <kostas@cloudera.com>
Closes #3119 from ksakellis/kostas-spark-4079 and squashes the following commits:
9709c7c [Kostas Sakellis] Removed unnecessary Logging class
63bfdd0 [Kostas Sakellis] Removed isAvailable to preserve binary compatibility
1d0ef2f [Kostas Sakellis] [SPARK-4079] [CORE] Added more information to exception
64f3d27 [Kostas Sakellis] [SPARK-4079] [CORE] Code review feedback
52dfa8f [Kostas Sakellis] [SPARK-4079] [CORE] Default to LZF if Snappy not available
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/io/CompressionCodec.scala | 27 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala | 6 |
2 files changed, 27 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 1ac7f4e448..f856890d27 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} +import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils +import org.apache.spark.Logging /** * :: DeveloperApi :: @@ -44,25 +45,33 @@ trait CompressionCodec { def compressedInputStream(s: InputStream): InputStream } - private[spark] object CompressionCodec { + private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) + createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) - val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) - .getConstructor(classOf[SparkConf]) - ctor.newInstance(conf).asInstanceOf[CompressionCodec] + val codec = try { + val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) + .getConstructor(classOf[SparkConf]) + Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) + } catch { + case e: ClassNotFoundException => None + case e: IllegalArgumentException => None + } + codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) } + val FALLBACK_COMPRESSION_CODEC = "lzf" val DEFAULT_COMPRESSION_CODEC = "snappy" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } @@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { + try { + Snappy.getNativeLibraryVersion + } catch { + case e: Error => throw new IllegalArgumentException + } + override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) new SnappyOutputStream(s, blockSize) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 25be7f25c2..8c6035fb36 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite { assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } + + test("bad compression codec") { + intercept[IllegalArgumentException] { + CompressionCodec.createCodec(conf, "foobar") + } + } } |