diff options
author | navis.ryu <navis@apache.org> | 2015-10-16 11:19:37 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-16 11:19:37 -0700 |
commit | b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29 (patch) | |
tree | b4d7c3b5e0dcfd3dcccfcefc0291b28198e75b7b /sql/core | |
parent | 4ee2cea2a43f7d04ab8511d9c029f80c5dadd48e (diff) | |
download | spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.gz spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.bz2 spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.zip |
[SPARK-11124] JsonParser/Generator should be closed for resource recycle
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes #9130 from navis/SPARK-11124.
Diffstat (limited to 'sql/core')
2 files changed, 27 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index b6f3410bad..d0780028da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils private[sql] object InferSchema { /** @@ -47,9 +48,10 @@ private[sql] object InferSchema { val factory = new JsonFactory() iter.map { row => try { - val parser = factory.createParser(row) - parser.nextToken() - inferField(parser) + Utils.tryWithResource(factory.createParser(row)) { parser => + parser.nextToken() + inferField(parser) + } } catch { case _: JsonParseException => StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index c51140749c..09b8a9e936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils private[sql] object JacksonParser { def apply( @@ -86,9 +87,9 @@ private[sql] object JacksonParser { case (_, StringType) => val writer = new ByteArrayOutputStream() - val generator = factory.createGenerator(writer, JsonEncoding.UTF8) - generator.copyCurrentStructure(parser) - generator.close() + Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { + generator => generator.copyCurrentStructure(parser) + } UTF8String.fromBytes(writer.toByteArray) case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => @@ -245,22 +246,24 @@ private[sql] object JacksonParser { iter.flatMap { record => try { - val parser = factory.createParser(record) - parser.nextToken() - - convertField(factory, parser, schema) match { - case null => failedRecord(record) - case row: InternalRow => row :: Nil - case array: ArrayData => - if (array.numElements() == 0) { - Nil - } else { - array.toArray[InternalRow](schema) - } - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of the file " + - "(or each string in the RDD) is a valid JSON object or an array of JSON objects.") + Utils.tryWithResource(factory.createParser(record)) { parser => + parser.nextToken() + + convertField(factory, parser, schema) match { + case null => failedRecord(record) + case row: InternalRow => row :: Nil + case array: ArrayData => + if (array.numElements() == 0) { + Nil + } else { + array.toArray[InternalRow](schema) + } + case _ => + sys.error( + s"Failed to parse record $record. Please make sure that each line of the file " + + "(or each string in the RDD) is a valid JSON object or " + + "an array of JSON objects.") + } } } catch { case _: JsonProcessingException => |