diff options
author | Shilei <shilei.qian@intel.com> | 2015-06-30 09:49:58 -0700 |
---|---|---|
committer | Davies Liu <davies@databricks.com> | 2015-06-30 09:49:58 -0700 |
commit | 722aa5f48ec105bf23eee2361adddfe3a0cd6fc4 (patch) | |
tree | 315a95999a4c51ea9194d28b34a3e108541996ce /sql | |
parent | a48e61915354d33fb98944a8eb5a5d48dd102041 (diff) | |
download | spark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.tar.gz spark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.tar.bz2 spark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.zip |
[SPARK-8236] [SQL] misc functions: crc32
https://issues.apache.org/jira/browse/SPARK-8236
Author: Shilei <shilei.qian@intel.com>
Closes #7108 from qiansl127/Crc32 and squashes the following commits:
5477352 [Shilei] Change to AutoCastInputTypes
5f16e5d [Shilei] Add misc function crc32
Diffstat (limited to 'sql')
5 files changed, 76 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index b17457d309..d53eaedda5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -139,6 +139,7 @@ object FunctionRegistry { expression[Sha2]("sha2"), expression[Sha1]("sha1"), expression[Sha1]("sha"), + expression[Crc32]("crc32"), // aggregate functions expression[Average]("avg"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 27805bff29..a7bcbe46c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.security.MessageDigest import java.security.NoSuchAlgorithmException +import java.util.zip.CRC32 import org.apache.commons.codec.digest.DigestUtils import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -168,3 +169,42 @@ case class Sha1(child: Expression) extends UnaryExpression with AutoCastInputTyp ) } } + +/** + * A function that computes a cyclic redundancy check value and returns it as a bigint + * For input of type [[BinaryType]] + */ +case class Crc32(child: Expression) + extends UnaryExpression with AutoCastInputTypes { + + override def dataType: DataType = LongType + + override def expectedChildTypes: Seq[DataType] = Seq(BinaryType) + + override def eval(input: InternalRow): Any = { + val value = child.eval(input) + if (value == null) { + null + } else { + val checksum = new CRC32 + checksum.update(value.asInstanceOf[Array[Byte]], 0, value.asInstanceOf[Array[Byte]].length) + checksum.getValue + } + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val value = child.gen(ctx) + val CRC32 = "java.util.zip.CRC32" + s""" + ${value.code} + boolean ${ev.isNull} = ${value.isNull}; + long ${ev.primitive} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${CRC32} checksum = new ${CRC32}(); + checksum.update(${value.primitive}, 0, ${value.primitive}.length); + ${ev.primitive} = checksum.getValue(); + } + """ + } + +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala index 36e636b5da..b524d0af14 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala @@ -49,4 +49,12 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Sha2(Literal("ABC".getBytes), Literal.create(null, IntegerType)), null) checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal.create(null, IntegerType)), null) } + + test("crc32") { + checkEvaluation(Crc32(Literal("ABC".getBytes)), 2743272264L) + checkEvaluation(Crc32(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), + 2180413220L) + checkEvaluation(Crc32(Literal.create(null, BinaryType)), null) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4d9a019058..6331fe6105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1466,6 +1466,22 @@ object functions { */ def sha2(columnName: String, numBits: Int): Column = sha2(Column(columnName), numBits) + /** + * Calculates the cyclic redundancy check value and returns the value as a bigint. + * + * @group misc_funcs + * @since 1.5.0 + */ + def crc32(e: Column): Column = Crc32(e.expr) + + /** + * Calculates the cyclic redundancy check value and returns the value as a bigint. + * + * @group misc_funcs + * @since 1.5.0 + */ + def crc32(columnName: String): Column = crc32(Column(columnName)) + ////////////////////////////////////////////////////////////////////////////////////////////// // String functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index abfd47c811..11a8767ead 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -173,6 +173,17 @@ class DataFrameFunctionsSuite extends QueryTest { } } + test("misc crc32 function") { + val df = Seq(("ABC", Array[Byte](1, 2, 3, 4, 5, 6))).toDF("a", "b") + checkAnswer( + df.select(crc32($"a"), crc32("b")), + Row(2743272264L, 2180413220L)) + + checkAnswer( + df.selectExpr("crc32(a)", "crc32(b)"), + Row(2743272264L, 2180413220L)) + } + test("string length function") { checkAnswer( nullStrings.select(strlen($"s"), strlen("s")), |