aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorJoseph Batchik <josephbatchik@gmail.com>2015-07-29 23:35:55 -0700
committerReynold Xin <rxin@databricks.com>2015-07-29 23:35:55 -0700
commit1221849f91739454b8e495889cba7498ba8beea7 (patch)
tree267b356aaa35da1189874af2a3dd1af3950cdbf4 /sql/catalyst
parente127ec34d58ceb0a9d45748c2f2918786ba0a83d (diff)
downloadspark-1221849f91739454b8e495889cba7498ba8beea7.tar.gz
spark-1221849f91739454b8e495889cba7498ba8beea7.tar.bz2
spark-1221849f91739454b8e495889cba7498ba8beea7.zip
[SPARK-8005][SQL] Input file name
Users can now get the file name of the partition being read in. A thread local variable is in `SQLNewHadoopRDD` and is set when the partition is computed. `SQLNewHadoopRDD` is moved to core so that the catalyst package can reach it. This supports: `df.select(inputFileName())` and `sqlContext.sql("select input_file_name() from table")` Author: Joseph Batchik <josephbatchik@gmail.com> Closes #7743 from JDrit/input_file_name and squashes the following commits: abb8609 [Joseph Batchik] fixed failing test and changed the default value to be an empty string d2f323d [Joseph Batchik] updates per review 102061f [Joseph Batchik] updates per review 75313f5 [Joseph Batchik] small fixes c7f7b5a [Joseph Batchik] addeding input file name to Spark SQL
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala4
4 files changed, 57 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 372f80d4a8..378df4f57d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -230,7 +230,8 @@ object FunctionRegistry {
expression[Sha1]("sha"),
expression[Sha1]("sha1"),
expression[Sha2]("sha2"),
- expression[SparkPartitionID]("spark_partition_id")
+ expression[SparkPartitionID]("spark_partition_id"),
+ expression[InputFileName]("input_file_name")
)
val builtin: FunctionRegistry = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
new file mode 100644
index 0000000000..1e74f71695
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.rdd.SqlNewHadoopRDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Expression that returns the name of the current file being read in using [[SqlNewHadoopRDD]]
+ */
+case class InputFileName() extends LeafExpression with Nondeterministic {
+
+ override def nullable: Boolean = true
+
+ override def dataType: DataType = StringType
+
+ override val prettyName = "INPUT_FILE_NAME"
+
+ override protected def initInternal(): Unit = {}
+
+ override protected def evalInternal(input: InternalRow): UTF8String = {
+ SqlNewHadoopRDD.getInputFileName()
+ }
+
+ override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ ev.isNull = "false"
+ s"final ${ctx.javaType(dataType)} ${ev.primitive} = " +
+ "org.apache.spark.rdd.SqlNewHadoopRDD.getInputFileName();"
+ }
+
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
index 3f6480bbf0..4b1772a2de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
@@ -34,6 +34,8 @@ private[sql] case class SparkPartitionID() extends LeafExpression with Nondeterm
@transient private[this] var partitionId: Int = _
+ override val prettyName = "SPARK_PARTITION_ID"
+
override protected def initInternal(): Unit = {
partitionId = TaskContext.getPartitionId()
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
index 82894822ab..bf1c930c0b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
@@ -27,4 +27,8 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("SparkPartitionID") {
checkEvaluation(SparkPartitionID(), 0)
}
+
+ test("InputFileName") {
+ checkEvaluation(InputFileName(), "")
+ }
}