aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-25 11:46:54 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-25 11:46:54 -0700
commit0b903caef3183c5113feb09995874f6a07aa6698 (patch)
tree50d2644301639aff508d55c0e94054ea615ccb0d /sql/catalyst
parentbe85245a98d58f636ff54956cdfde15ea5cd6122 (diff)
downloadspark-0b903caef3183c5113feb09995874f6a07aa6698.tar.gz
spark-0b903caef3183c5113feb09995874f6a07aa6698.tar.bz2
spark-0b903caef3183c5113feb09995874f6a07aa6698.zip
[SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql core
## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #17408 from cloud-fan/minor.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala33
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala83
2 files changed, 33 insertions, 83 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
new file mode 100644
index 0000000000..985f0dc1cd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Exception thrown when the underlying parser meet a bad record and can't parse it.
+ * @param record a function to return the record that cause the parser to fail
+ * @param partialResult a function that returns an optional row, which is the partial result of
+ * parsing this bad record.
+ * @param cause the actual exception about why the record is bad and can't be parsed.
+ */
+case class BadRecordException(
+ record: () => UTF8String,
+ partialResult: () => Option[InternalRow],
+ cause: Throwable) extends Exception(cause)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
deleted file mode 100644
index 725e3015b3..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.util
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-
-class FailureSafeParser[IN](
- rawParser: IN => Seq[InternalRow],
- mode: ParseMode,
- schema: StructType,
- columnNameOfCorruptRecord: String) {
-
- private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
- private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
- private val resultRow = new GenericInternalRow(schema.length)
- private val nullResult = new GenericInternalRow(schema.length)
-
- // This function takes 2 parameters: an optional partial result, and the bad record. If the given
- // schema doesn't contain a field for corrupted record, we just return the partial result or a
- // row with all fields null. If the given schema contains a field for corrupted record, we will
- // set the bad record to this field, and set other fields according to the partial result or null.
- private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
- if (corruptFieldIndex.isDefined) {
- (row, badRecord) => {
- var i = 0
- while (i < actualSchema.length) {
- val from = actualSchema(i)
- resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
- i += 1
- }
- resultRow(corruptFieldIndex.get) = badRecord()
- resultRow
- }
- } else {
- (row, _) => row.getOrElse(nullResult)
- }
- }
-
- def parse(input: IN): Iterator[InternalRow] = {
- try {
- rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
- } catch {
- case e: BadRecordException => mode match {
- case PermissiveMode =>
- Iterator(toResultRow(e.partialResult(), e.record))
- case DropMalformedMode =>
- Iterator.empty
- case FailFastMode =>
- throw e.cause
- }
- }
- }
-}
-
-/**
- * Exception thrown when the underlying parser meet a bad record and can't parse it.
- * @param record a function to return the record that cause the parser to fail
- * @param partialResult a function that returns an optional row, which is the partial result of
- * parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be parsed.
- */
-case class BadRecordException(
- record: () => UTF8String,
- partialResult: () => Option[InternalRow],
- cause: Throwable) extends Exception(cause)