diff options
author | Reynold Xin <rxin@databricks.com> | 2016-12-04 21:51:10 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-12-04 21:51:10 -0800 |
commit | e9730b707ddf6e344de3b3b8f43487f7b0f18e25 (patch) | |
tree | 4cd4e931c62488e87d9100970e9c5a5e6bd45c56 /sql/catalyst/src | |
parent | b019b3a8ac49336e657f5e093fa2fba77f8d12d2 (diff) | |
download | spark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.tar.gz spark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.tar.bz2 spark-e9730b707ddf6e344de3b3b8f43487f7b0f18e25.zip |
[SPARK-18702][SQL] input_file_block_start and input_file_block_length
## What changes were proposed in this pull request?
We currently have function input_file_name to get the path of the input file, but don't have functions to get the block start offset and length. This patch introduces two functions:
1. input_file_block_start: returns the file block start offset, or -1 if not available.
2. input_file_block_length: returns the file block length, or -1 if not available.
## How was this patch tested?
Updated existing test cases in ColumnExpressionSuite that covered input_file_name to also cover the two new functions.
Author: Reynold Xin <rxin@databricks.com>
Closes #16133 from rxin/SPARK-18702.
Diffstat (limited to 'sql/catalyst/src')
3 files changed, 96 insertions, 49 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index e41f1cad93..5d065d736e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -371,6 +371,8 @@ object FunctionRegistry { expression[Sha2]("sha2"), expression[SparkPartitionID]("spark_partition_id"), expression[InputFileName]("input_file_name"), + expression[InputFileBlockStart]("input_file_block_start"), + expression[InputFileBlockLength]("input_file_block_length"), expression[MonotonicallyIncreasingID]("monotonically_increasing_id"), expression[CurrentDatabase]("current_database"), expression[CallMethodViaReflection]("reflect"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala deleted file mode 100644 index d412336699..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions - -import org.apache.spark.rdd.InputFileNameHolder -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.types.{DataType, StringType} -import org.apache.spark.unsafe.types.UTF8String - -/** - * Expression that returns the name of the current file being read. - */ -@ExpressionDescription( - usage = "_FUNC_() - Returns the name of the current file being read if available.") -case class InputFileName() extends LeafExpression with Nondeterministic { - - override def nullable: Boolean = false - - override def dataType: DataType = StringType - - override def prettyName: String = "input_file_name" - - override protected def initializeInternal(partitionIndex: Int): Unit = {} - - override protected def evalInternal(input: InternalRow): UTF8String = { - InputFileNameHolder.getInputFileName() - } - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " + - "org.apache.spark.rdd.InputFileNameHolder.getInputFileName();", isNull = "false") - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala new file mode 100644 index 0000000000..7a8edabed1 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.{DataType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +@ExpressionDescription( + usage = "_FUNC_() - Returns the name of the file being read, or empty string if not available.") +case class InputFileName() extends LeafExpression with Nondeterministic { + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def prettyName: String = "input_file_name" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): UTF8String = { + InputFileBlockHolder.getInputFilePath + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val className = InputFileBlockHolder.getClass.getName.stripSuffix("$") + ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " + + s"$className.getInputFilePath();", isNull = "false") + } +} + + +@ExpressionDescription( + usage = "_FUNC_() - Returns the start offset of the block being read, or -1 if not available.") +case class InputFileBlockStart() extends LeafExpression with Nondeterministic { + override def nullable: Boolean = false + + override def dataType: DataType = LongType + + override def prettyName: String = "input_file_block_start" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Long = { + InputFileBlockHolder.getStartOffset + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val className = InputFileBlockHolder.getClass.getName.stripSuffix("$") + ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " + + s"$className.getStartOffset();", isNull = "false") + } +} + + +@ExpressionDescription( + usage = "_FUNC_() - Returns the length of the block being read, or -1 if not available.") +case class InputFileBlockLength() extends LeafExpression with Nondeterministic { + override def nullable: Boolean = false + + override def dataType: DataType = LongType + + override def prettyName: String = "input_file_block_length" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Long = { + InputFileBlockHolder.getLength + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val className = InputFileBlockHolder.getClass.getName.stripSuffix("$") + ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = " + + s"$className.getLength();", isNull = "false") + } +} |