diff options
author | navis.ryu <navis@apache.org> | 2015-10-16 11:19:37 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-16 11:19:37 -0700 |
commit | b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29 (patch) | |
tree | b4d7c3b5e0dcfd3dcccfcefc0291b28198e75b7b /sql/catalyst/src | |
parent | 4ee2cea2a43f7d04ab8511d9c029f80c5dadd48e (diff) | |
download | spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.gz spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.bz2 spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.zip |
[SPARK-11124] JsonParser/Generator should be closed for resource recycle
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes #9130 from navis/SPARK-11124.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala | 56 |
1 files changed, 26 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 0770fab0ae..8c9853e628 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils import scala.util.parsing.combinator.RegexParsers @@ -134,16 +135,18 @@ case class GetJsonObject(json: Expression, path: Expression) if (parsed.isDefined) { try { - val parser = jsonFactory.createParser(jsonStr.getBytes) - val output = new ByteArrayOutputStream() - val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - parser.nextToken() - val matched = evaluatePath(parser, generator, RawStyle, parsed.get) - generator.close() - if (matched) { - UTF8String.fromBytes(output.toByteArray) - } else { - null + Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser => + val output = new ByteArrayOutputStream() + val matched = Utils.tryWithResource( + jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => + parser.nextToken() + evaluatePath(parser, generator, RawStyle, parsed.get) + } + if (matched) { + UTF8String.fromBytes(output.toByteArray) + } else { + null + } } } catch { case _: JsonProcessingException => null @@ -250,17 +253,18 @@ case class GetJsonObject(json: Expression, path: Expression) // temporarily buffer child matches, the emitted json will need to be // modified slightly if there is only a single element written val buffer = new StringWriter() - val flattenGenerator = jsonFactory.createGenerator(buffer) - flattenGenerator.writeStartArray() var dirty = 0 - while (p.nextToken() != END_ARRAY) { - // track the number of array elements and only emit an outer array if - // we've written more than one element, this matches Hive's behavior - dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) + Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator => + flattenGenerator.writeStartArray() + + while (p.nextToken() != END_ARRAY) { + // track the number of array elements and only emit an outer array if + // we've written more than one element, this matches Hive's behavior + dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) + } + flattenGenerator.writeEndArray() } - flattenGenerator.writeEndArray() - flattenGenerator.close() val buf = buffer.getBuffer if (dirty > 1) { @@ -370,12 +374,8 @@ case class JsonTuple(children: Seq[Expression]) } try { - val parser = jsonFactory.createParser(json.getBytes) - - try { - parseRow(parser, input) - } finally { - parser.close() + Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { + parser => parseRow(parser, input) } } catch { case _: JsonProcessingException => @@ -420,12 +420,8 @@ case class JsonTuple(children: Seq[Expression]) // write the output directly to UTF8 encoded byte array if (parser.nextToken() != JsonToken.VALUE_NULL) { - val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - - try { - copyCurrentStructure(generator, parser) - } finally { - generator.close() + Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { + generator => copyCurrentStructure(generator, parser) } row(idx) = UTF8String.fromBytes(output.toByteArray) |