aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/cassandra_outputformat.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/python/cassandra_outputformat.py')
-rw-r--r--examples/src/main/python/cassandra_outputformat.py23
1 files changed, 12 insertions, 11 deletions
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index 836c35b5c6..d144539e58 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -50,7 +50,8 @@ if __name__ == "__main__":
Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \
+ /path/to/examples/cassandra_outputformat.py <args>
Assumes you have created the following table <cf> in Cassandra already,
running on <host>, in <keyspace>.
@@ -67,16 +68,16 @@ if __name__ == "__main__":
cf = sys.argv[3]
sc = SparkContext(appName="CassandraOutputFormat")
- conf = {"cassandra.output.thrift.address":host,
- "cassandra.output.thrift.port":"9160",
- "cassandra.output.keyspace":keyspace,
- "cassandra.output.partitioner.class":"Murmur3Partitioner",
- "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
- "mapreduce.output.basename":cf,
- "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
- "mapreduce.job.output.key.class":"java.util.Map",
- "mapreduce.job.output.value.class":"java.util.List"}
- key = {"user_id" : int(sys.argv[4])}
+ conf = {"cassandra.output.thrift.address": host,
+ "cassandra.output.thrift.port": "9160",
+ "cassandra.output.keyspace": keyspace,
+ "cassandra.output.partitioner.class": "Murmur3Partitioner",
+ "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+ "mapreduce.output.basename": cf,
+ "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+ "mapreduce.job.output.key.class": "java.util.Map",
+ "mapreduce.job.output.value.class": "java.util.List"}
+ key = {"user_id": int(sys.argv[4])}
sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",