aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShilei <shilei.qian@intel.com>2015-06-30 09:49:58 -0700
committerDavies Liu <davies@databricks.com>2015-06-30 09:49:58 -0700
commit722aa5f48ec105bf23eee2361adddfe3a0cd6fc4 (patch)
tree315a95999a4c51ea9194d28b34a3e108541996ce /sql
parenta48e61915354d33fb98944a8eb5a5d48dd102041 (diff)
downloadspark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.tar.gz
spark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.tar.bz2
spark-722aa5f48ec105bf23eee2361adddfe3a0cd6fc4.zip
[SPARK-8236] [SQL] misc functions: crc32
https://issues.apache.org/jira/browse/SPARK-8236 Author: Shilei <shilei.qian@intel.com> Closes #7108 from qiansl127/Crc32 and squashes the following commits: 5477352 [Shilei] Change to AutoCastInputTypes 5f16e5d [Shilei] Add misc function crc32
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala11
5 files changed, 76 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index b17457d309..d53eaedda5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -139,6 +139,7 @@ object FunctionRegistry {
expression[Sha2]("sha2"),
expression[Sha1]("sha1"),
expression[Sha1]("sha"),
+ expression[Crc32]("crc32"),
// aggregate functions
expression[Average]("avg"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 27805bff29..a7bcbe46c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
+import java.util.zip.CRC32
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
@@ -168,3 +169,42 @@ case class Sha1(child: Expression) extends UnaryExpression with AutoCastInputTyp
)
}
}
+
+/**
+ * A function that computes a cyclic redundancy check value and returns it as a bigint
+ * For input of type [[BinaryType]]
+ */
+case class Crc32(child: Expression)
+ extends UnaryExpression with AutoCastInputTypes {
+
+ override def dataType: DataType = LongType
+
+ override def expectedChildTypes: Seq[DataType] = Seq(BinaryType)
+
+ override def eval(input: InternalRow): Any = {
+ val value = child.eval(input)
+ if (value == null) {
+ null
+ } else {
+ val checksum = new CRC32
+ checksum.update(value.asInstanceOf[Array[Byte]], 0, value.asInstanceOf[Array[Byte]].length)
+ checksum.getValue
+ }
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ val value = child.gen(ctx)
+ val CRC32 = "java.util.zip.CRC32"
+ s"""
+ ${value.code}
+ boolean ${ev.isNull} = ${value.isNull};
+ long ${ev.primitive} = ${ctx.defaultValue(dataType)};
+ if (!${ev.isNull}) {
+ ${CRC32} checksum = new ${CRC32}();
+ checksum.update(${value.primitive}, 0, ${value.primitive}.length);
+ ${ev.primitive} = checksum.getValue();
+ }
+ """
+ }
+
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
index 36e636b5da..b524d0af14 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala
@@ -49,4 +49,12 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Sha2(Literal("ABC".getBytes), Literal.create(null, IntegerType)), null)
checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal.create(null, IntegerType)), null)
}
+
+ test("crc32") {
+ checkEvaluation(Crc32(Literal("ABC".getBytes)), 2743272264L)
+ checkEvaluation(Crc32(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)),
+ 2180413220L)
+ checkEvaluation(Crc32(Literal.create(null, BinaryType)), null)
+ }
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 4d9a019058..6331fe6105 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1466,6 +1466,22 @@ object functions {
*/
def sha2(columnName: String, numBits: Int): Column = sha2(Column(columnName), numBits)
+ /**
+ * Calculates the cyclic redundancy check value and returns the value as a bigint.
+ *
+ * @group misc_funcs
+ * @since 1.5.0
+ */
+ def crc32(e: Column): Column = Crc32(e.expr)
+
+ /**
+ * Calculates the cyclic redundancy check value and returns the value as a bigint.
+ *
+ * @group misc_funcs
+ * @since 1.5.0
+ */
+ def crc32(columnName: String): Column = crc32(Column(columnName))
+
//////////////////////////////////////////////////////////////////////////////////////////////
// String functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index abfd47c811..11a8767ead 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -173,6 +173,17 @@ class DataFrameFunctionsSuite extends QueryTest {
}
}
+ test("misc crc32 function") {
+ val df = Seq(("ABC", Array[Byte](1, 2, 3, 4, 5, 6))).toDF("a", "b")
+ checkAnswer(
+ df.select(crc32($"a"), crc32("b")),
+ Row(2743272264L, 2180413220L))
+
+ checkAnswer(
+ df.selectExpr("crc32(a)", "crc32(b)"),
+ Row(2743272264L, 2180413220L))
+ }
+
test("string length function") {
checkAnswer(
nullStrings.select(strlen($"s"), strlen("s")),