aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-25 11:46:54 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-25 11:46:54 -0700
commit0b903caef3183c5113feb09995874f6a07aa6698 (patch)
tree50d2644301639aff508d55c0e94054ea615ccb0d /sql/core/src/main
parentbe85245a98d58f636ff54956cdfde15ea5cd6122 (diff)
downloadspark-0b903caef3183c5113feb09995874f6a07aa6698.tar.gz
spark-0b903caef3183c5113feb09995874f6a07aa6698.tar.bz2
spark-0b903caef3183c5113feb09995874f6a07aa6698.zip
[SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql core
## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #17408 from cloud-fan/minor.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala3
4 files changed, 76 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e6d2b1bc28..6c238618f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,11 +27,10 @@ import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}
import org.apache.spark.sql.execution.datasources.csv._
-import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
new file mode 100644
index 0000000000..159aef220b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+ rawParser: IN => Seq[InternalRow],
+ mode: ParseMode,
+ schema: StructType,
+ columnNameOfCorruptRecord: String) {
+
+ private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
+ private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
+ private val resultRow = new GenericInternalRow(schema.length)
+ private val nullResult = new GenericInternalRow(schema.length)
+
+ // This function takes 2 parameters: an optional partial result, and the bad record. If the given
+ // schema doesn't contain a field for corrupted record, we just return the partial result or a
+ // row with all fields null. If the given schema contains a field for corrupted record, we will
+ // set the bad record to this field, and set other fields according to the partial result or null.
+ private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
+ if (corruptFieldIndex.isDefined) {
+ (row, badRecord) => {
+ var i = 0
+ while (i < actualSchema.length) {
+ val from = actualSchema(i)
+ resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
+ i += 1
+ }
+ resultRow(corruptFieldIndex.get) = badRecord()
+ resultRow
+ }
+ } else {
+ (row, _) => row.getOrElse(nullResult)
+ }
+ }
+
+ def parse(input: IN): Iterator[InternalRow] = {
+ try {
+ rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
+ } catch {
+ case e: BadRecordException => mode match {
+ case PermissiveMode =>
+ Iterator(toResultRow(e.partialResult(), e.record))
+ case DropMalformedMode =>
+ Iterator.empty
+ case FailFastMode =>
+ throw e.cause
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 263f77e11c..c3657acb7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -30,7 +30,8 @@ import com.univocity.parsers.csv.CsvParser
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
+import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 51e952c122..4f2963da9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -33,8 +33,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
-import org.apache.spark.sql.execution.datasources.{CodecStreams, DataSource, HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String