1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
private[sql] class CSVOptions(
@transient private val parameters: Map[String, String])
extends Logging with Serializable {
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
case Some(value) if value.length == 0 => '\u0000'
case Some(value) if value.length == 1 => value.charAt(0)
case _ => throw new RuntimeException(s"$paramName cannot be more than one character")
}
}
private def getInt(paramName: String, default: Int): Int = {
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
case Some(value) => try {
value.toInt
} catch {
case e: NumberFormatException =>
throw new RuntimeException(s"$paramName should be an integer. Found $value")
}
}
}
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString)
if (param.toLowerCase == "true") {
true
} else if (param.toLowerCase == "false") {
false
} else {
throw new Exception(s"$paramName flag can be true or false")
}
}
val delimiter = CSVTypeCast.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
val quote = getChar("quote", '\"')
val escape = getChar("escape", '\\')
val comment = getChar("comment", '\u0000')
val headerFlag = getBool("header")
val inferSchemaFlag = getBool("inferSchema")
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}
val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)
val nullValue = parameters.getOrElse("nullValue", "")
val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
}
val maxColumns = getInt("maxColumns", 20480)
val maxCharsPerColumn = getInt("maxCharsPerColumn", 1000000)
val inputBufferSize = 128
val isCommentSet = this.comment != '\u0000'
val rowSeparator = "\n"
}
|