aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authornavis.ryu <navis@apache.org>2015-10-16 11:19:37 -0700
committerReynold Xin <rxin@databricks.com>2015-10-16 11:19:37 -0700
commitb9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29 (patch)
treeb4d7c3b5e0dcfd3dcccfcefc0291b28198e75b7b /sql
parent4ee2cea2a43f7d04ab8511d9c029f80c5dadd48e (diff)
downloadspark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.gz
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.tar.bz2
spark-b9c5e5d4ac4c9fe29e880f4ee562a9c552e81d29.zip
[SPARK-11124] JsonParser/Generator should be closed for resource recycle
Some json parsers are not closed. parser in JacksonParser#parseJson, for example. Author: navis.ryu <navis@apache.org> Closes #9130 from navis/SPARK-11124.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala41
3 files changed, 53 insertions, 52 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 0770fab0ae..8c9853e628 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType}
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
import scala.util.parsing.combinator.RegexParsers
@@ -134,16 +135,18 @@ case class GetJsonObject(json: Expression, path: Expression)
if (parsed.isDefined) {
try {
- val parser = jsonFactory.createParser(jsonStr.getBytes)
- val output = new ByteArrayOutputStream()
- val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8)
- parser.nextToken()
- val matched = evaluatePath(parser, generator, RawStyle, parsed.get)
- generator.close()
- if (matched) {
- UTF8String.fromBytes(output.toByteArray)
- } else {
- null
+ Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser =>
+ val output = new ByteArrayOutputStream()
+ val matched = Utils.tryWithResource(
+ jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
+ parser.nextToken()
+ evaluatePath(parser, generator, RawStyle, parsed.get)
+ }
+ if (matched) {
+ UTF8String.fromBytes(output.toByteArray)
+ } else {
+ null
+ }
}
} catch {
case _: JsonProcessingException => null
@@ -250,17 +253,18 @@ case class GetJsonObject(json: Expression, path: Expression)
// temporarily buffer child matches, the emitted json will need to be
// modified slightly if there is only a single element written
val buffer = new StringWriter()
- val flattenGenerator = jsonFactory.createGenerator(buffer)
- flattenGenerator.writeStartArray()
var dirty = 0
- while (p.nextToken() != END_ARRAY) {
- // track the number of array elements and only emit an outer array if
- // we've written more than one element, this matches Hive's behavior
- dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
+ Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
+ flattenGenerator.writeStartArray()
+
+ while (p.nextToken() != END_ARRAY) {
+ // track the number of array elements and only emit an outer array if
+ // we've written more than one element, this matches Hive's behavior
+ dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
+ }
+ flattenGenerator.writeEndArray()
}
- flattenGenerator.writeEndArray()
- flattenGenerator.close()
val buf = buffer.getBuffer
if (dirty > 1) {
@@ -370,12 +374,8 @@ case class JsonTuple(children: Seq[Expression])
}
try {
- val parser = jsonFactory.createParser(json.getBytes)
-
- try {
- parseRow(parser, input)
- } finally {
- parser.close()
+ Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) {
+ parser => parseRow(parser, input)
}
} catch {
case _: JsonProcessingException =>
@@ -420,12 +420,8 @@ case class JsonTuple(children: Seq[Expression])
// write the output directly to UTF8 encoded byte array
if (parser.nextToken() != JsonToken.VALUE_NULL) {
- val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8)
-
- try {
- copyCurrentStructure(generator, parser)
- } finally {
- generator.close()
+ Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) {
+ generator => copyCurrentStructure(generator, parser)
}
row(idx) = UTF8String.fromBytes(output.toByteArray)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index b6f3410bad..d0780028da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
private[sql] object InferSchema {
/**
@@ -47,9 +48,10 @@ private[sql] object InferSchema {
val factory = new JsonFactory()
iter.map { row =>
try {
- val parser = factory.createParser(row)
- parser.nextToken()
- inferField(parser)
+ Utils.tryWithResource(factory.createParser(row)) { parser =>
+ parser.nextToken()
+ inferField(parser)
+ }
} catch {
case _: JsonParseException =>
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index c51140749c..09b8a9e936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
private[sql] object JacksonParser {
def apply(
@@ -86,9 +87,9 @@ private[sql] object JacksonParser {
case (_, StringType) =>
val writer = new ByteArrayOutputStream()
- val generator = factory.createGenerator(writer, JsonEncoding.UTF8)
- generator.copyCurrentStructure(parser)
- generator.close()
+ Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+ generator => generator.copyCurrentStructure(parser)
+ }
UTF8String.fromBytes(writer.toByteArray)
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
@@ -245,22 +246,24 @@ private[sql] object JacksonParser {
iter.flatMap { record =>
try {
- val parser = factory.createParser(record)
- parser.nextToken()
-
- convertField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of the file " +
- "(or each string in the RDD) is a valid JSON object or an array of JSON objects.")
+ Utils.tryWithResource(factory.createParser(record)) { parser =>
+ parser.nextToken()
+
+ convertField(factory, parser, schema) match {
+ case null => failedRecord(record)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ sys.error(
+ s"Failed to parse record $record. Please make sure that each line of the file " +
+ "(or each string in the RDD) is a valid JSON object or " +
+ "an array of JSON objects.")
+ }
}
} catch {
case _: JsonProcessingException =>