aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/functions.py14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala33
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala10
6 files changed, 102 insertions, 8 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index acdb01d3d3..cfa87aeea1 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -35,6 +35,7 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
__all__ = [
'array',
'approxCountDistinct',
+ 'bin',
'coalesce',
'countDistinct',
'explode',
@@ -231,6 +232,19 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)
+@ignore_unicode_prefix
+@since(1.5)
+def bin(col):
+ """Returns the string representation of the binary value of the given column.
+
+ >>> df.select(bin(df.age).alias('c')).collect()
+ [Row(c=u'10'), Row(c=u'101')]
+ """
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.bin(_to_java_column(col))
+ return Column(jc)
+
+
@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 13b2bb05f5..79273a7840 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -103,6 +103,7 @@ object FunctionRegistry {
expression[Asin]("asin"),
expression[Atan]("atan"),
expression[Atan2]("atan2"),
+ expression[Bin]("bin"),
expression[Cbrt]("cbrt"),
expression[Ceil]("ceil"),
expression[Ceil]("ceiling"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index f79bf4aee0..250564dc4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.catalyst.expressions
+import java.lang.{Long => JLong}
+
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.types.{DataType, DoubleType}
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
/**
* A leaf expression specifically for math constants. Math constants expect no input.
@@ -207,6 +210,34 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
override def funcName: String = "toRadians"
}
+case class Bin(child: Expression)
+ extends UnaryExpression with Serializable with ExpectsInputTypes {
+
+ val name: String = "BIN"
+
+ override def foldable: Boolean = child.foldable
+ override def nullable: Boolean = true
+ override def toString: String = s"$name($child)"
+
+ override def expectedChildTypes: Seq[DataType] = Seq(LongType)
+ override def dataType: DataType = StringType
+
+ def funcName: String = name.toLowerCase
+
+ override def eval(input: catalyst.InternalRow): Any = {
+ val evalE = child.eval(input)
+ if (evalE == null) {
+ null
+ } else {
+ UTF8String.fromString(JLong.toBinaryString(evalE.asInstanceOf[Long]))
+ }
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ defineCodeGen(ctx, ev, (c) =>
+ s"${ctx.stringType}.fromString(java.lang.Long.toBinaryString($c))")
+ }
+}
////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 21e9b92b72..0d1d5ebdff 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.types.DoubleType
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType}
class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -41,16 +42,18 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
* Used for testing unary math expressions.
*
* @param c expression
- * @param f The functions in scala.math
+ * @param f The functions in scala.math or elsewhere used to generate expected results
* @param domain The set of values to run the function with
* @param expectNull Whether the given values should return null or not
* @tparam T Generic type for primitives
+ * @tparam U Generic type for the output of the given function `f`
*/
- private def testUnary[T](
+ private def testUnary[T, U](
c: Expression => Expression,
- f: T => T,
+ f: T => U,
domain: Iterable[T] = (-20 to 20).map(_ * 0.1),
- expectNull: Boolean = false): Unit = {
+ expectNull: Boolean = false,
+ evalType: DataType = DoubleType): Unit = {
if (expectNull) {
domain.foreach { value =>
checkEvaluation(c(Literal(value)), null, EmptyRow)
@@ -60,7 +63,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(c(Literal(value)), f(value), EmptyRow)
}
}
- checkEvaluation(c(Literal.create(null, DoubleType)), null, create_row(null))
+ checkEvaluation(c(Literal.create(null, evalType)), null, create_row(null))
}
/**
@@ -168,7 +171,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("signum") {
- testUnary[Double](Signum, math.signum)
+ testUnary[Double, Double](Signum, math.signum)
}
test("log") {
@@ -186,6 +189,23 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true)
}
+ test("bin") {
+ testUnary(Bin, java.lang.Long.toBinaryString, (-20 to 20).map(_.toLong), evalType = LongType)
+
+ val row = create_row(null, 12L, 123L, 1234L, -123L)
+ val l1 = 'a.long.at(0)
+ val l2 = 'a.long.at(1)
+ val l3 = 'a.long.at(2)
+ val l4 = 'a.long.at(3)
+ val l5 = 'a.long.at(4)
+
+ checkEvaluation(Bin(l1), null, row)
+ checkEvaluation(Bin(l2), java.lang.Long.toBinaryString(12), row)
+ checkEvaluation(Bin(l3), java.lang.Long.toBinaryString(123), row)
+ checkEvaluation(Bin(l4), java.lang.Long.toBinaryString(1234), row)
+ checkEvaluation(Bin(l5), java.lang.Long.toBinaryString(-123), row)
+ }
+
test("log2") {
def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2)
testUnary(Log2, f, (0 to 20).map(_ * 0.1))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d8a91bead7..40ae9f5df8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -890,6 +890,24 @@ object functions {
def atan2(l: Double, rightName: String): Column = atan2(l, Column(rightName))
/**
+ * An expression that returns the string representation of the binary value of the given long
+ * column. For example, bin("12") returns "1100".
+ *
+ * @group math_funcs
+ * @since 1.5.0
+ */
+ def bin(e: Column): Column = Bin(e.expr)
+
+ /**
+ * An expression that returns the string representation of the binary value of the given long
+ * column. For example, bin("12") returns "1100".
+ *
+ * @group math_funcs
+ * @since 1.5.0
+ */
+ def bin(columnName: String): Column = bin(Column(columnName))
+
+ /**
* Computes the cube-root of the given value.
*
* @group math_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index cfd23867a9..70819fe287 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -110,6 +110,16 @@ class DataFrameFunctionsSuite extends QueryTest {
testData2.collect().toSeq.map(r => Row(~r.getInt(0))))
}
+ test("bin") {
+ val df = Seq[(Integer, Integer)]((12, null)).toDF("a", "b")
+ checkAnswer(
+ df.select(bin("a"), bin("b")),
+ Row("1100", null))
+ checkAnswer(
+ df.selectExpr("bin(a)", "bin(b)"),
+ Row("1100", null))
+ }
+
test("if function") {
val df = Seq((1, 2)).toDF("a", "b")
checkAnswer(