aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authornavis.ryu <navis@apache.org>2015-10-16 11:19:37 -0700
committerReynold Xin <rxin@databricks.com>2015-10-16 11:19:37 -0700
commitb9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29 (patch)
treeb4d7c3b5e0dcfd3dcccfcefc0291b28198e75b7b /sql/catalyst
parent4ee2cea2a43f7d04ab8511d9c029f80c5dadd48e (diff)
downloadspark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.gz
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.bz2
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.zip
[SPARK-11124] JsonParser/Generator should be closed for resource recycle
Some json parsers are not closed. parser in JacksonParser#parseJson, for example. Author: navis.ryu <navis@apache.org> Closes #9130 from navis/SPARK-11124.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala56
1 files changed, 26 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 0770fab0ae..8c9853e628 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType}
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
import scala.util.parsing.combinator.RegexParsers
@@ -134,16 +135,18 @@ case class GetJsonObject(json: Expression, path: Expression)
if (parsed.isDefined) {
try {
- val parser = jsonFactory.createParser(jsonStr.getBytes)
- val output = new ByteArrayOutputStream()
- val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8)
- parser.nextToken()
- val matched = evaluatePath(parser, generator, RawStyle, parsed.get)
- generator.close()
- if (matched) {
- UTF8String.fromBytes(output.toByteArray)
- } else {
- null
+ Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser =>
+ val output = new ByteArrayOutputStream()
+ val matched = Utils.tryWithResource(
+ jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
+ parser.nextToken()
+ evaluatePath(parser, generator, RawStyle, parsed.get)
+ }
+ if (matched) {
+ UTF8String.fromBytes(output.toByteArray)
+ } else {
+ null
+ }
}
} catch {
case _: JsonProcessingException => null
@@ -250,17 +253,18 @@ case class GetJsonObject(json: Expression, path: Expression)
// temporarily buffer child matches, the emitted json will need to be
// modified slightly if there is only a single element written
val buffer = new StringWriter()
- val flattenGenerator = jsonFactory.createGenerator(buffer)
- flattenGenerator.writeStartArray()
var dirty = 0
- while (p.nextToken() != END_ARRAY) {
- // track the number of array elements and only emit an outer array if
- // we've written more than one element, this matches Hive's behavior
- dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
+ Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
+ flattenGenerator.writeStartArray()
+
+ while (p.nextToken() != END_ARRAY) {
+ // track the number of array elements and only emit an outer array if
+ // we've written more than one element, this matches Hive's behavior
+ dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
+ }
+ flattenGenerator.writeEndArray()
}
- flattenGenerator.writeEndArray()
- flattenGenerator.close()
val buf = buffer.getBuffer
if (dirty > 1) {
@@ -370,12 +374,8 @@ case class JsonTuple(children: Seq[Expression])
}
try {
- val parser = jsonFactory.createParser(json.getBytes)
-
- try {
- parseRow(parser, input)
- } finally {
- parser.close()
+ Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) {
+ parser => parseRow(parser, input)
}
} catch {
case _: JsonProcessingException =>
@@ -420,12 +420,8 @@ case class JsonTuple(children: Seq[Expression])
// write the output directly to UTF8 encoded byte array
if (parser.nextToken() != JsonToken.VALUE_NULL) {
- val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8)
-
- try {
- copyCurrentStructure(generator, parser)
- } finally {
- generator.close()
+ Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) {
+ generator => copyCurrentStructure(generator, parser)
}
row(idx) = UTF8String.fromBytes(output.toByteArray)