aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-12-04 21:51:10 -0800
committerReynold Xin <rxin@databricks.com>2016-12-04 21:51:10 -0800
commite9730b707ddf6e344de3b3b8f43487f7b0f18e25 (patch)
tree4cd4e931c62488e87d9100970e9c5a5e6bd45c56 /sql/catalyst/src
parentb019b3a8ac49336e657f5e093fa2fba77f8d12d2 (diff)
downloadspark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.tar.gz
spark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.tar.bz2
spark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.zip
[SPARK-18702][SQL] input_file_block_start and input_file_block_length
## What changes were proposed in this pull request? We currently have function input_file_name to get the path of the input file, but don't have functions to get the block start offset and length. This patch introduces two functions: 1. input_file_block_start: returns the file block start offset, or -1 if not available. 2. input_file_block_length: returns the file block length, or -1 if not available. ## How was this patch tested? Updated existing test cases in ColumnExpressionSuite that covered input_file_name to also cover the two new functions. Author: Reynold Xin <rxin@databricks.com> Closes #16133 from rxin/SPARK-18702.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala94
3 files changed, 96 insertions, 49 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e41f1cad93..5d065d736e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -371,6 +371,8 @@ object FunctionRegistry {
expression[Sha2]("sha2"),
expression[SparkPartitionID]("spark_partition_id"),
expression[InputFileName]("input_file_name"),
+ expression[InputFileBlockStart]("input_file_block_start"),
+ expression[InputFileBlockLength]("input_file_block_length"),
expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
expression[CurrentDatabase]("current_database"),
expression[CallMethodViaReflection]("reflect"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
deleted file mode 100644
index d412336699..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.rdd.InputFileNameHolder
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * Expression that returns the name of the current file being read.
- */
-@ExpressionDescription(
- usage = "_FUNC_() - Returns the name of the current file being read if available.")
-case class InputFileName() extends LeafExpression with Nondeterministic {
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = StringType
-
- override def prettyName: String = "input_file_name"
-
- override protected def initializeInternal(partitionIndex: Int): Unit = {}
-
- override protected def evalInternal(input: InternalRow): UTF8String = {
- InputFileNameHolder.getInputFileName()
- }
-
- override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " +
- "org.apache.spark.rdd.InputFileNameHolder.getInputFileName();", isNull = "false")
- }
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala
new file mode 100644
index 0000000000..7a8edabed1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+@ExpressionDescription(
+ usage = "_FUNC_() - Returns the name of the file being read, or empty string if not available.")
+case class InputFileName() extends LeafExpression with Nondeterministic {
+
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = StringType
+
+ override def prettyName: String = "input_file_name"
+
+ override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+ override protected def evalInternal(input: InternalRow): UTF8String = {
+ InputFileBlockHolder.getInputFilePath
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")
+ ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " +
+ s"$className.getInputFilePath();", isNull = "false")
+ }
+}
+
+
+@ExpressionDescription(
+ usage = "_FUNC_() - Returns the start offset of the block being read, or -1 if not available.")
+case class InputFileBlockStart() extends LeafExpression with Nondeterministic {
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "input_file_block_start"
+
+ override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+ override protected def evalInternal(input: InternalRow): Long = {
+ InputFileBlockHolder.getStartOffset
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")
+ ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " +
+ s"$className.getStartOffset();", isNull = "false")
+ }
+}
+
+
+@ExpressionDescription(
+ usage = "_FUNC_() - Returns the length of the block being read, or -1 if not available.")
+case class InputFileBlockLength() extends LeafExpression with Nondeterministic {
+ override def nullable: Boolean = false
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "input_file_block_length"
+
+ override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+ override protected def evalInternal(input: InternalRow): Long = {
+ InputFileBlockHolder.getLength
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")
+ ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " +
+ s"$className.getLength();", isNull = "false")
+ }
+}