Real time search using Apache Storm




Apache Storm is real time , distributed and fault tolerant stream processing engine. It was Developed by Twitter in 2011 and was open sourced few years later . Numerous companies like Spotify, WebMD, Yahoo use Apache Storm for processing logs, user engagement data, asset data, web documents data in real time and also for search. Apache storm can process, aggregate streams in real time.

A stream is nothing but unbounded sequence of tuples. A tuple is a collection of objects. If you are building a social data processing applications based on twitter. facebook or yelp data, you often need to register app with them and they would in turn start sending data in forms of streams to you. So any feed from twitter, facebook or yelp would be considered as stream.


A typical Storm Cluster Architecture looks like below


Storm Architecture


On the left , we have a master node that runs a daemon called Nimbus. Nimbus is responsible for identifying and delegating work to worker nodes.

On the right we have worker nodes (also called as executors) . They run a daemon called supervisor. A supervisor is responsible for delegating to work to process on that node.

In the middle you see Zookeeper nodes. Nimbus uses zookeeper to coordinate work among worker nodes. Zookeeper maintains state of nimbus and all worker nodes. Nimbus just tells zookeeper a set of tasks . Zookeeper looks at state table and decides which worker node it should assign task too. Both Nimbus and Executors are stateless. All state is maintained inside zookeeper. Even if Nimbus or worker nodes go down, state can be restored by zookeeper.

On each worker node, we have spouts and bolt. We configure a topology to connect spouts and bolts using a filed grouping . The structure appears to be a direct acyclic graph (DAG)


Spouts and Bolts

A spout is source of data. For instance twitter feed, a file, a database or a kafka queue

A bolt is consumer of data. Message processing happens inside the bolt. You could have one more more bolts configured for each specialized tasks.

A spout/bolt are connected to other bolts using topology. Topology specifies type of grouping like shuffle , field, all depending on use case.


Here is an example of topology for word counter topology


word counter topology

Here file spout is reading data from a test file and sending it to Sentence splitter bolt one sentence at a time. The grouping between file spout and sentence splitter is random which is ok because there file spout is talking directly just to one bolt. Sentence splits sentence into words and then each word to word counter bolts. Here there are 2 word counter bolts. We need to ensure that for word count to work properly, the topology must send same words to same bolt.

For example, lets consider following sentence

Sentence 1 : quick white fox jumps over white fence and joined gray fox.

Here the words are [quick, white, fox, jumps, over, white, fence, and, joined, gray, fox]. There are 2 “white” words and 2 “fox” words. If the first white word goes top word counter bolt , the second “white” word must also go to same top bolt, else word counts will be messed. This can be ensured using “field grouping” on “word”.

To build real time search engine, following topologies can be used.


Top topology builds twitter search index. We have Twitter spout that received twitter stream from Twitter on “covid-19” topic. We then feed ES bolt. ES bolt extracts field like tweet, user, createdAt and retweet count and indexes it into Twitter Search index in real time. We can then send query to elastic search to perform search in real time a it is getting indexed.

Bottom topology is used to get trending hashtags. As usual we have Twitter spout that keeps receiving tweets on covid-19, It then send tweet object to Hashtag extractor which splits tweet text into words and check in there are nay hashtags are present. All hashtags are then forwarded to two hashtag counter bolt which counts hashtags and maintains into hash map. Once topology is destroyed, hashtag counter bolt writes data into elastic search.then dumps the data into elastic search. When can then query elastic search for hashtags and sort them by count in descending order.


Here is code for Twitter Spout


package Remote;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;


import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

public class TwitterSpout extends BaseRichSpout
{
    //Queue for tweets
    private LinkedBlockingQueue<Status> queue;
    //stream of tweets
    private TwitterStream twitterStream;

    private SpoutOutputCollector collector;

    public void open(Map conf, TopologyContext context,SpoutOutputCollector collector)
    {

        this.collector = collector;

        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true)
                .setOAuthConsumerKey("xxxxx")
                .setOAuthConsumerSecret("xxxxx")
                .setOAuthAccessToken("xxxxxx")
                .setOAuthAccessTokenSecret("xxxxxx");


        this.twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
        this.queue = new LinkedBlockingQueue<Status>();

        final StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {

                queue.offer(status);
            }

            public void onDeletionNotice(StatusDeletionNotice sdn) {
            }


            public void onTrackLimitationNotice(int i) {
            }


            public void onScrubGeo(long l, long l1) {
            }


            public void onException(Exception e) {
            }


            public void onStallWarning(StallWarning warning) {
            }
        };

        twitterStream.addListener(listener);
        final FilterQuery query = new FilterQuery();
        query.track(new String[]{"corona"});
        twitterStream.filter(query);
    }

    public void nextTuple()
    {

        final Status status = queue.poll();

        if (status == null) {
            Utils.sleep(50);
        } else {
            collector.emit(new Values(status));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("tweet"));
    }

    public void close()
    {
        twitterStream.shutdown();
    }

}




Here is code for ES Bolt


package Remote;

import Remote.ElasticSearchClient;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import twitter4j.Status;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.Date;

public class ESBolt extends BaseBasicBolt
{
    ElasticSearchClient client ;
    private long docCounter ;

    public void prepare(Map stormConf, TopologyContext context)
    {
        this.docCounter = 0 ;
        this.client = new ElasticSearchClient();
    }

    public void cleanup()
    {
        try {
            this.client.Close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    public void execute(Tuple input,BasicOutputCollector collector) {


        System.out.println("Inside es bolt's execute method");
        Status status = (Status) input.getValueByField("tweet");
        String tweetText = status.getText();
        long retweetCount = status.getRetweetCount();
        String user = status.getUser().getName();
        Date createdAt = status.getCreatedAt();
        String profileUrl = status.getUser().get400x400ProfileImageURLHttps();


        Map jsonMap = new HashMap();
        UUID uuid = UUID.randomUUID();
        jsonMap.put("tweet", tweetText);
        jsonMap.put("createdat", createdAt);
        jsonMap.put("user",user);
        jsonMap.put("retweetcount", retweetCount);

        try {
            System.out.println("Writing to ES");
            String docId = client.insert(jsonMap, "twitter", "_doc");

            System.out.println("Successfully inserted doc " + docId);
            this.docCounter += 1 ;
            System.out.println("****** DocNUM ****** = " + this.docCounter);
        }
        catch(Exception ex)
        {
            System.out.println(ex.getStackTrace());
        }
        finally
        {

        }
        //collector.
        //collector.emit(new Values(tweetText));

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("tweet"));
    }



}

Here is code for Search Topology


package Remote;

import Remote.ESBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class LocalTopology
{
    public static void main(String[] args) throws InterruptedException {

        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Twitter-Spout", new TwitterSpout());
        builder.setBolt("es-bolt", new ESBolt(),3)
                .shuffleGrouping("Twitter-Spout");

        //Configuration
        Config conf = new Config();
        conf.setDebug(true);


        LocalCluster cluster = new LocalCluster();
        try{
            cluster.submitTopology("TwitterTopology", conf, builder.createTopology());
            Thread.sleep(10000);
        }
        finally{
            cluster.shutdown();}
    }
}

Here is code for Hashtag Extractor


package Remote;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import twitter4j.Status;

public class HashTagNormalizerBolt  extends BaseBasicBolt
{
    public void execute(Tuple input,BasicOutputCollector collector) {
        Status status = (Status) input.getValueByField("tweet");
        String tweet = status.getText();
        if(tweet != null)
        {
            String[] hashtags = tweet.split(" ");
            if(hashtags != null) {
                for (String hashtag : hashtags) {
                    hashtag = hashtag.trim();
                    if (!hashtag.isEmpty() && hashtag.startsWith("#")) {
                        hashtag = hashtag.toLowerCase();
                        System.out.println("******Got hashtag " + hashtag + " ********");
                        hashtag = hashtag.replace("#", "");
                        collector.emit(new Values(hashtag));
                    }
                }
            }
        }

    }


    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("hashtag"));
    }


}

Here is code for hashtag counter


package Remote;

import Remote.ElasticSearchClient;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import twitter4j.Status;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.Date;

public class HashTagCounter extends BaseBasicBolt
{
    Integer id;
    String name;
    Map<String, Long> counters;
    ElasticSearchClient client ;
    private long docCounter ;
    private long lastClearTime;
    /** Number of seconds before the top list will be cleared. */
    private long clearIntervalSec;

    public void prepare(Map stormConf, TopologyContext context)
    {
        this.docCounter = 0 ;
        counters = new HashMap<String, Long>();
        this.client = new ElasticSearchClient();

    }

    public void execute(Tuple input,BasicOutputCollector collector)
    {
        String hashtag = input.getString(0);
        System.out.println("Got hashtag " + hashtag);

        if(!counters.containsKey(hashtag)){
            counters.put(hashtag, 1L);
        }else{
            Long c = counters.get(hashtag) + 1;
            counters.put(hashtag, c);
        }

        //collector.
        //collector.emit(new Values(tweetText));

    }

    private String insertDoc(String hashtag, long count)
    {

        String docId = "";
        Map jsonMap = new HashMap();
        UUID uuid = UUID.randomUUID();
        jsonMap.put("hashtag", hashtag);
        //jsonMap.put("createdat", createdAt);
        jsonMap.put("count", count);

        try {
            System.out.println("Writing to ES");
             docId = client.insert(jsonMap, "hashtag", "_doc");

            System.out.println("Successfully inserted doc " + docId);
            this.docCounter += 1 ;
            System.out.println("****** DocNUM ****** = " + this.docCounter);
        }
        catch(Exception ex)
        {
            System.out.println(ex.getStackTrace());
        }
        finally
        {

        }
        return docId;

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("tweet"));
    }

    public void cleanup()
    {
        try
        {
            for(Map.Entry<String, Long> entry : counters.entrySet())
            {
                String hashtag = entry.getKey();
                long count = entry.getValue();
                String docID = insertDoc(hashtag, count);
                System.out.println("inserted doc " +  docID + "in hashtag index");
            }


            this.client.Close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

Here is code for HashTag topology


package Remote;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class HashTagTopology
{
    public static void main(String[] args) throws InterruptedException
    {

        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Twitter-Spout", new TwitterSpout());
        builder.setBolt("hashtag-normalizer", new HashTagNormalizerBolt())
                .shuffleGrouping("Twitter-Spout");
        builder.setBolt("hashtag-counter", new HashTagCounter(),2)
                .fieldsGrouping("hashtag-normalizer", new Fields("hashtag"));


        //Configuration
        Config conf = new Config();
        conf.setDebug(true);


        LocalCluster cluster = new LocalCluster();
        try{
            cluster.submitTopology("HashTag-Topology", conf, builder.createTopology());
            Thread.sleep(60000);
        }
        finally{
            cluster.shutdown();}
    }
}


I have also build web application using python and Flask to search in real time. Here is snippet for elastic search queries from python web application




from datetime import datetime
from elasticsearch import Elasticsearch

es = Elasticsearch()
#res = es.search(index="twitter", body={"query": {"match_all": {}}})
keyword="virus"
res = es.search(index="twitter", body={"query": {"match" :
		{
			"tweet":keyword
		}}})

res2 = es.search(index="hashtag", body={"query" : 
	{
		"match_all" :{}
	},
	"sort" : {
        "count": {"order": "desc"}
    }})

	

print("Got %d Hits:" % res2['hits']['total']['value'])
for hit in res2['hits']['hits']:
    print(hit["_source"]["hashtag"])


Here is UI of search web application


search web application


You can search on twitter data as soon as you get data from twitter stream. It’s almost instantaneous (few ms).

If you would like to learn step by about how to use Apache Storm using video tutorials, you may sign up for my course at https://www.udemy.com/course/draft/2925630/?referralCode=7A52437DD15045A1B031


About Author Evergreen Technologies:

•Linked in: @evergreenllc2020

•Twitter: @tech_evergreen

•Udemy: https://www.udemy.com/user/evergreen-technologies-2/

•Github: https://github.com/evergreenllc2020/

Over 22,000 students in 145 countries





Subscribe to EVergreen Technologies newsletter

I'm a title. ​Click here to edit me.

  • Twitter
  • Facebook
  • Linkedin

© 2023 by Evergreen Technologies