aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-26 11:46:58 -0700
committerReynold Xin <rxin@databricks.com>2015-04-26 11:46:58 -0700
commitca55dc95b777d96b27d4e4c0457dd25145dcd6e9 (patch)
tree391b1d42a227ec3744b4ff67852c01c4ec2e1b71 /sql
parent9a5bbe05fc1b1141e139d32661821fef47d7a13c (diff)
downloadspark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.gz
spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.tar.bz2
spark-ca55dc95b777d96b27d4e4c0457dd25145dcd6e9.zip
[SPARK-7152][SQL] Add a Column expression for partition ID.
Author: Reynold Xin <rxin@databricks.com> Closes #5705 from rxin/df-pid and squashes the following commits: 401018f [Reynold Xin] [SPARK-7152][SQL] Add a Column expression for partition ID.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala8
4 files changed, 89 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
new file mode 100644
index 0000000000..fe7607c6ac
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.expressions
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.types.{IntegerType, DataType}
+
+
+/**
+ * Expression that returns the current partition id of the Spark task.
+ */
+case object SparkPartitionID extends Expression with trees.LeafNode[Expression] {
+ self: Product =>
+
+ override type EvaluatedType = Int
+
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = IntegerType
+
+ override def eval(input: Row): Int = TaskContext.get().partitionId()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
new file mode 100644
index 0000000000..568b7ac2c5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+/**
+ * Package containing expressions that are specific to Spark runtime.
+ */
+package object expressions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index ff91e1d74b..9738fd4f93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -277,6 +277,13 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////
/**
+ * Computes the absolute value.
+ *
+ * @group normal_funcs
+ */
+ def abs(e: Column): Column = Abs(e.expr)
+
+ /**
* Returns the first column that is not null.
* {{{
* df.select(coalesce(df("a"), df("b")))
@@ -288,6 +295,13 @@ object functions {
def coalesce(e: Column*): Column = Coalesce(e.map(_.expr))
/**
+ * Converts a string exprsesion to lower case.
+ *
+ * @group normal_funcs
+ */
+ def lower(e: Column): Column = Lower(e.expr)
+
+ /**
* Unary minus, i.e. negate the expression.
* {{{
* // Select the amount column and negates all values.
@@ -317,18 +331,13 @@ object functions {
def not(e: Column): Column = !e
/**
- * Converts a string expression to upper case.
+ * Partition ID of the Spark task.
*
- * @group normal_funcs
- */
- def upper(e: Column): Column = Upper(e.expr)
-
- /**
- * Converts a string exprsesion to lower case.
+ * Note that this is indeterministic because it depends on data partitioning and task scheduling.
*
* @group normal_funcs
*/
- def lower(e: Column): Column = Lower(e.expr)
+ def sparkPartitionId(): Column = execution.expressions.SparkPartitionID
/**
* Computes the square root of the specified float value.
@@ -338,11 +347,11 @@ object functions {
def sqrt(e: Column): Column = Sqrt(e.expr)
/**
- * Computes the absolutle value.
+ * Converts a string expression to upper case.
*
* @group normal_funcs
*/
- def abs(e: Column): Column = Abs(e.expr)
+ def upper(e: Column): Column = Upper(e.expr)
//////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index bc8fae100d..904073b8cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -310,6 +310,14 @@ class ColumnExpressionSuite extends QueryTest {
)
}
+ test("sparkPartitionId") {
+ val df = TestSQLContext.sparkContext.parallelize(1 to 1, 1).map(i => (i, i)).toDF("a", "b")
+ checkAnswer(
+ df.select(sparkPartitionId()),
+ Row(0)
+ )
+ }
+
test("lift alias out of cast") {
compareExpressions(
col("1234").as("name").cast("int").expr,