aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authornavis.ryu <navis@apache.org>2015-10-16 11:19:37 -0700
committerReynold Xin <rxin@databricks.com>2015-10-16 11:19:37 -0700
commitb9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29 (patch)
treeb4d7c3b5e0dcfd3dcccfcefc0291b28198e75b7b /sql/core
parent4ee2cea2a43f7d04ab8511d9c029f80c5dadd48e (diff)
downloadspark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.gz
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.bz2
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.zip
[SPARK-11124] JsonParser/Generator should be closed for resource recycle
Some json parsers are not closed. parser in JacksonParser#parseJson, for example. Author: navis.ryu <navis@apache.org> Closes #9130 from navis/SPARK-11124.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala41
2 files changed, 27 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index b6f3410bad..d0780028da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
private[sql] object InferSchema {
/**
@@ -47,9 +48,10 @@ private[sql] object InferSchema {
val factory = new JsonFactory()
iter.map { row =>
try {
- val parser = factory.createParser(row)
- parser.nextToken()
- inferField(parser)
+ Utils.tryWithResource(factory.createParser(row)) { parser =>
+ parser.nextToken()
+ inferField(parser)
+ }
} catch {
case _: JsonParseException =>
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index c51140749c..09b8a9e936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
private[sql] object JacksonParser {
def apply(
@@ -86,9 +87,9 @@ private[sql] object JacksonParser {
case (_, StringType) =>
val writer = new ByteArrayOutputStream()
- val generator = factory.createGenerator(writer, JsonEncoding.UTF8)
- generator.copyCurrentStructure(parser)
- generator.close()
+ Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+ generator => generator.copyCurrentStructure(parser)
+ }
UTF8String.fromBytes(writer.toByteArray)
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
@@ -245,22 +246,24 @@ private[sql] object JacksonParser {
iter.flatMap { record =>
try {
- val parser = factory.createParser(record)
- parser.nextToken()
-
- convertField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of the file " +
- "(or each string in the RDD) is a valid JSON object or an array of JSON objects.")
+ Utils.tryWithResource(factory.createParser(record)) { parser =>
+ parser.nextToken()
+
+ convertField(factory, parser, schema) match {
+ case null => failedRecord(record)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ sys.error(
+ s"Failed to parse record $record. Please make sure that each line of the file " +
+ "(or each string in the RDD) is a valid JSON object or " +
+ "an array of JSON objects.")
+ }
}
} catch {
case _: JsonProcessingException =>