aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala33
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala83
2 files changed, 33 insertions, 83 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
new file mode 100644
index 0000000000..985f0dc1cd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Exception thrown when the underlying parser meet a bad record and can't parse it.
+ * @param record a function to return the record that cause the parser to fail
+ * @param partialResult a function that returns an optional row, which is the partial result of
+ * parsing this bad record.
+ * @param cause the actual exception about why the record is bad and can't be parsed.
+ */
+case class BadRecordException(
+ record: () => UTF8String,
+ partialResult: () => Option[InternalRow],
+ cause: Throwable) extends Exception(cause)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
deleted file mode 100644
index 725e3015b3..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.util
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-
-class FailureSafeParser[IN](
- rawParser: IN => Seq[InternalRow],
- mode: ParseMode,
- schema: StructType,
- columnNameOfCorruptRecord: String) {
-
- private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
- private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
- private val resultRow = new GenericInternalRow(schema.length)
- private val nullResult = new GenericInternalRow(schema.length)
-
- // This function takes 2 parameters: an optional partial result, and the bad record. If the given
- // schema doesn't contain a field for corrupted record, we just return the partial result or a
- // row with all fields null. If the given schema contains a field for corrupted record, we will
- // set the bad record to this field, and set other fields according to the partial result or null.
- private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
- if (corruptFieldIndex.isDefined) {
- (row, badRecord) => {
- var i = 0
- while (i < actualSchema.length) {
- val from = actualSchema(i)
- resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
- i += 1
- }
- resultRow(corruptFieldIndex.get) = badRecord()
- resultRow
- }
- } else {
- (row, _) => row.getOrElse(nullResult)
- }
- }
-
- def parse(input: IN): Iterator[InternalRow] = {
- try {
- rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
- } catch {
- case e: BadRecordException => mode match {
- case PermissiveMode =>
- Iterator(toResultRow(e.partialResult(), e.record))
- case DropMalformedMode =>
- Iterator.empty
- case FailFastMode =>
- throw e.cause
- }
- }
- }
-}
-
-/**
- * Exception thrown when the underlying parser meet a bad record and can't parse it.
- * @param record a function to return the record that cause the parser to fail
- * @param partialResult a function that returns an optional row, which is the partial result of
- * parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be parsed.
- */
-case class BadRecordException(
- record: () => UTF8String,
- partialResult: () => Option[InternalRow],
- cause: Throwable) extends Exception(cause)