Discuss@GL4L

Spark Streaming Example(Hands-on)


#1

1.Using Spark’s streaming context and socket streaming function, build a word count program that will count frequency of words in each input data point in a stream where a data point is one full line of words terminated with a new line character (‘/n’)

2.The source of input stream will be one terminal where we will start netcat server at port 9999 and key in our sentences

3.Spark Stream application will listen for data on same host and same port i.e. 9999. The application will count frequency of words in each line that we input.

•Sentence on the right is captured by spark stream program and broken into individual words to reflect frequency of each word

Objective – to demonstrate use of spark streaming context with socket streaming

nc -lk 9999 /* run this on one terminal and the following on another

/* run the following command from $HOME/sparkprojects directory only

$SPARK_HOME/bin/spark-submit --class “NetworkWordCount” --master local[4] ./target/scala-2.11/sparknetworkstreaming_2.11-1.0.jar 127.0.0.1 9999

object NetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: NetworkWordCount <hostname> <port>")

System.exit(1)

}

Logger.getLogger("org").setLevel(Level.OFF)

Logger.getLogger("akka").setLevel(Level.OFF

// Create the context with a 1 second batch size

val sparkConf = new SparkConf().setAppName("NetworkWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the words in input stream of \n delimited text

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

}
}

Streaming live tweets from Twitter.com
1.Spark shell does not support stream data capture from streaming sources such as twitter.com

2.Since this feature (of connecting to Twitter.com and pulling tweets) is not core spark functionality, it is not an available feature by default

3.To capture tweets, one needs to down load TwitterforJ libraries

4.The twitter Oauth keys are also required

1.import org.apache.spark._

2.import org.apache.spark.streaming._

3.import org.apache.spark.streaming.twitter._

4.import org.apache.spark.streaming.StreamingContext._

5.import twitter4j.auth.{Authorization, OAuthAuthorization}

6.object SparkTwitter{

7.def main(args: Array[String]){

8.System.setProperty("twitter4j.oauth.consumerKey", "y2HE2X4EZX8hYL6ubjWw7og0g")

9.System.setProperty("twitter4j.oauth.consumerSecret", "cblnCZKmVzKkBAx74Ke4hYY2q3tOlqbXQUDxh6H9HWnT8xZjgi")

10.System.setProperty("twitter4j.oauth.accessToken", "258372551-g4POJRNoba6xvWuLDJgHu1G0i2YJsDOcBIrqeVWR")

11.System.setProperty("twitter4j.oauth.accessTokenSecret", "KLOJnI4jMoS6OdHa4SvUl6Rl1QURCNHKk01ECiWcyI8pY")

12.val sparkConf = new SparkConf().setAppName("Twitter Streaming")

13.val sc = new SparkContext(sparkConf)

14.val ssc = new StreamingContext(sc, Seconds(1))

15.val tweets = TwitterUtils.createStream(ssc, None)

16.val statuses = tweets.map(_.getText)

17.statuses.print()

18.ssc.start()

19.}

20.}

Some useful URLS to learn Spark Streaming Analysis

examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala

examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala

examples/src/main/scala/org/apache/spark/examples/streaming/TwitterHashTagJoinSentiments.scala

examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala

For DataFrames

http://www.infoobjects.com/dataframes-with-apache-spark/

http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html

Reference links: