aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.rat-excludes1
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala3
-rw-r--r--examples/src/main/python/avro_inputformat.py75
-rw-r--r--examples/src/main/resources/user.avsc8
-rw-r--r--examples/src/main/resources/users.avrobin0 -> 334 bytes
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala130
8 files changed, 231 insertions, 13 deletions
diff --git a/.rat-excludes b/.rat-excludes
index bccb043c2b..eaefef1b0a 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -25,6 +25,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
+.*avsc
.*txt
.*json
.*data
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index f3b05e1243..49dc95f349 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
@@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
- val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
+ val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fefe1cb6f1..9f5c5bd30f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
@@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
@@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
- } yield (Class.forName(k), Class.forName(v))
+ } yield (Utils.classForName(k), Utils.classForName(v))
}
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
@@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
- val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}
@@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8cac5da644..019f68b160 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -146,6 +146,9 @@ private[spark] object Utils extends Logging {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}
+ /** Preferred alternative to Class.forName(className) */
+ def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)
+
/**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
new file mode 100644
index 0000000000..e902ae2975
--- /dev/null
+++ b/examples/src/main/python/avro_inputformat.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Read data file users.avro in local Spark distro:
+
+$ cd $SPARK_HOME
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro
+{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
+{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
+
+To read name and favorite_color fields only, specify the following reader schema:
+
+$ cat examples/src/main/resources/user.avsc
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
+
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
+{u'favorite_color': None, u'name': u'Alyssa'}
+{u'favorite_color': u'red', u'name': u'Ben'}
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 2 and len(sys.argv) != 3:
+ print >> sys.stderr, """
+ Usage: avro_inputformat <data_file> [reader_schema_file]
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
+ Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
+ """
+ exit(-1)
+
+ path = sys.argv[1]
+ sc = SparkContext(appName="AvroKeyInputFormat")
+
+ conf = None
+ if len(sys.argv) == 3:
+ schema_rdd = sc.textFile(sys.argv[2], 1).collect()
+ conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
+
+ avro_rdd = sc.newAPIHadoopFile(path,
+ "org.apache.avro.mapreduce.AvroKeyInputFormat",
+ "org.apache.avro.mapred.AvroKey",
+ "org.apache.hadoop.io.NullWritable",
+ keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
+ conf=conf)
+ output = avro_rdd.map(lambda x: x[0]).collect()
+ for k in output:
+ print k
diff --git a/examples/src/main/resources/user.avsc b/examples/src/main/resources/user.avsc
new file mode 100644
index 0000000000..4995357ab3
--- /dev/null
+++ b/examples/src/main/resources/user.avsc
@@ -0,0 +1,8 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
diff --git a/examples/src/main/resources/users.avro b/examples/src/main/resources/users.avro
new file mode 100644
index 0000000000..27c526ab11
--- /dev/null
+++ b/examples/src/main/resources/users.avro
Binary files differ
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
new file mode 100644
index 0000000000..1b25983a38
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import java.util.{Collection => JCollection, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.avro.generic.{GenericFixed, IndexedRecord}
+import org.apache.avro.mapred.AvroWrapper
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type._
+
+import org.apache.spark.api.python.Converter
+import org.apache.spark.SparkException
+
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
+ * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
+ */
+class AvroWrapperToJavaConverter extends Converter[Any, Any] {
+ override def convert(obj: Any): Any = {
+ if (obj == null) {
+ return null
+ }
+ obj.asInstanceOf[AvroWrapper[_]].datum() match {
+ case null => null
+ case record: IndexedRecord => unpackRecord(record)
+ case other => throw new SparkException(
+ s"Unsupported top-level Avro data type ${other.getClass.getName}")
+ }
+ }
+
+ def unpackRecord(obj: Any): JMap[String, Any] = {
+ val map = new java.util.HashMap[String, Any]
+ obj match {
+ case record: IndexedRecord =>
+ record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
+ map.put(f.name, fromAvro(record.get(i), f.schema))
+ }
+ case other => throw new SparkException(
+ s"Unsupported RECORD type ${other.getClass.getName}")
+ }
+ map
+ }
+
+ def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
+ obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
+ (key.toString, fromAvro(value, schema.getValueType))
+ }
+ }
+
+ def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
+ unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
+ }
+
+ def unpackBytes(obj: Any): Array[Byte] = {
+ val bytes: Array[Byte] = obj match {
+ case buf: java.nio.ByteBuffer => buf.array()
+ case arr: Array[Byte] => arr
+ case other => throw new SparkException(
+ s"Unknown BYTES type ${other.getClass.getName}")
+ }
+ val bytearray = new Array[Byte](bytes.length)
+ System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
+ bytearray
+ }
+
+ def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
+ case c: JCollection[_] =>
+ c.map(fromAvro(_, schema.getElementType))
+ case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
+ arr.toSeq
+ case arr: Array[_] =>
+ arr.map(fromAvro(_, schema.getElementType)).toSeq
+ case other => throw new SparkException(
+ s"Unknown ARRAY type ${other.getClass.getName}")
+ }
+
+ def unpackUnion(obj: Any, schema: Schema): Any = {
+ schema.getTypes.toList match {
+ case List(s) => fromAvro(obj, s)
+ case List(n, s) if n.getType == NULL => fromAvro(obj, s)
+ case List(s, n) if n.getType == NULL => fromAvro(obj, s)
+ case _ => throw new SparkException(
+ "Unions may only consist of a concrete type and null")
+ }
+ }
+
+ def fromAvro(obj: Any, schema: Schema): Any = {
+ if (obj == null) {
+ return null
+ }
+ schema.getType match {
+ case UNION => unpackUnion(obj, schema)
+ case ARRAY => unpackArray(obj, schema)
+ case FIXED => unpackFixed(obj, schema)
+ case MAP => unpackMap(obj, schema)
+ case BYTES => unpackBytes(obj)
+ case RECORD => unpackRecord(obj)
+ case STRING => obj.toString
+ case ENUM => obj.toString
+ case NULL => obj
+ case BOOLEAN => obj
+ case DOUBLE => obj
+ case FLOAT => obj
+ case INT => obj
+ case LONG => obj
+ case other => throw new SparkException(
+ s"Unknown Avro schema type ${other.getName}")
+ }
+ }
+}