aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorRohit Rai <rohit@tuplejump.com>2013-06-03 15:15:52 +0530
committerRohit Rai <rohit@tuplejump.com>2013-06-03 15:15:52 +0530
commitb104c7f5c7e2b173fe1b10035efbc00e43df13ec (patch)
tree947ae5743d8cc1c2121bb3b79f36b128b8ebfb75 /examples
parent56c64c403383e90a5fd33b6a1f72527377d9bee0 (diff)
downloadspark-b104c7f5c7e2b173fe1b10035efbc00e43df13ec.tar.gz
spark-b104c7f5c7e2b173fe1b10035efbc00e43df13ec.tar.bz2
spark-b104c7f5c7e2b173fe1b10035efbc00e43df13ec.zip
Example to write the output to cassandra
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/CassandraTest.scala48
1 files changed, 43 insertions, 5 deletions
diff --git a/examples/src/main/scala/spark/examples/CassandraTest.scala b/examples/src/main/scala/spark/examples/CassandraTest.scala
index 6b9fd502e2..2cc62b9fe9 100644
--- a/examples/src/main/scala/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/spark/examples/CassandraTest.scala
@@ -1,17 +1,16 @@
package spark.examples
import org.apache.hadoop.mapreduce.Job
-import org.apache.cassandra.hadoop.{ConfigHelper, ColumnFamilyInputFormat}
-import org.apache.cassandra.thrift.{IndexExpression, SliceRange, SlicePredicate}
+import org.apache.cassandra.hadoop.{ColumnFamilyOutputFormat, ConfigHelper, ColumnFamilyInputFormat}
+import org.apache.cassandra.thrift._
import spark.{RDD, SparkContext}
-import SparkContext._
+import spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/*
@@ -44,8 +43,15 @@ object CassandraTest {
ConfigHelper.setInputRpcPort(job.getConfiguration(), args(2))
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), args(1))
+
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), args(2))
+
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
+
+
val predicate = new SlicePredicate()
val sliceRange = new SliceRange()
sliceRange.setStart(Array.empty[Byte])
@@ -55,6 +61,8 @@ object CassandraTest {
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
//Make a new Hadoop RDD
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[ColumnFamilyInputFormat],
@@ -74,6 +82,33 @@ object CassandraTest {
counts.collect().foreach {
case (word, count) => println(word + ":" + count)
}
+
+ counts.map {
+ case (word, count) => {
+ val colWord = new org.apache.cassandra.thrift.Column()
+ colWord.setName(ByteBufferUtil.bytes("word"))
+ colWord.setValue(ByteBufferUtil.bytes(word))
+ colWord.setTimestamp(System.currentTimeMillis)
+
+ val colCount = new org.apache.cassandra.thrift.Column()
+ colCount.setName(ByteBufferUtil.bytes("wcount"))
+ colCount.setValue(ByteBufferUtil.bytes(count.toLong))
+ colCount.setTimestamp(System.currentTimeMillis)
+
+
+ val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
+
+ val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
+ mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(0).column_or_supercolumn.setColumn(colWord)
+
+ mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(1).column_or_supercolumn.setColumn(colCount)
+ (outputkey, mutations)
+ }
+ }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
+ classOf[ColumnFamilyOutputFormat], job.getConfiguration)
+
}
}
@@ -81,6 +116,9 @@ object CassandraTest {
create keyspace casDemo;
use casDemo;
+create column family WordCount with comparator = UTF8Type;
+update column family WordCount with column_metadata = [{column_name: word, validation_class: UTF8Type}, {column_name: wcount, validation_class: LongType}];
+
create column family Words with comparator = UTF8Type;
update column family Words with column_metadata = [{column_name: book, validation_class: UTF8Type}, {column_name: para, validation_class: UTF8Type}];