Skip to content

Latest commit

 

History

History
247 lines (176 loc) · 15 KB

ch04Spouts.asc

File metadata and controls

247 lines (176 loc) · 15 KB

Spouts

In this chapter, we’ll take a look at the most commonly used strategies for designing the entry point for a topology (a spout) and how to make spouts fault-tolerant.

Reliable vs Unreliable messages

When designing a topology, one important thing to keep in mind is message reliability. If a message can’t be processed, we need to decide what to do with the individual message and what to do with the topology as a whole. For example, when processing bank deposits, it is important not to lose a single transaction message. But if we’re processing millions of tweets looking for some statistical metric, and one tweet gets lost, we can assume that the metric will be still be fairly accurate.

In storm, it is the author’s responsibility to guarantee message reliability according to the needs of each topology. This involves a trade-off. A reliable topology must manage lost messages, which requires more resources. A less reliable topology may lose some messages, but is less resource-intensive, whatever be the chosen reliability strategy, storm provides the tools to implement it.

To manage reliability at the spout, we can include a message ID with the tuple at emit time (collector.emit(new Values(…),tupleId)). The methods ack and fail are called when a tuple is processed correctly or fails respectively. Tuple processing succeeds when the tuple is processed by all target bolts and all anchored bolts (we will discuss how to anchor a bolt to a tuple in the Chapter 5, Bolts). Tuple processing fails when:

  • collector.fail(tuple) is called by the target spout.

  • processing time exceeds the configured timeout.

Let’s take a look at an example. Imagine we are processing bank transactions, and we have the following requirements:

  • If a transaction fails, re-send the message.

  • If the transaction fails too many times, terminate the topology.

We’ll create a spout that sends 100 random transaction IDs, and a bolt that fails for 80% of tuples received (you can find the complete example at ch04-spout examples). We’ll implement the spout using a Map to emit transaction message tuples so that it’s easy to re-send messages.

public void nextTuple() {
    if(!toSend.isEmpty()){
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){
            Integer transactionId = transactionEntry.getKey();
            String transactionMessage = transactionEntry.getValue();
            collector.emit(new Values(transactionMessage),transactionId);
        }
        toSend.clear();
    }
}

If there are messages waiting to be sent, get each transaction message and its associated ID and emit them as a tuple, then clear the message queue. Note that it’s safe to call clear on the map, because nextTuple, fail, and ack are the only methods that modify the map, and they all run in the same thread.

We maintain two maps to keep track of transaction messages waiting to be sent, and the number of times each transaction has failed. The ack method simply removes the transaction message from each list.

public void ack(Object msgId) {
    messages.remove(msgId);
    failCounterMessages.remove(msgId);
}

The fail method decides whether to re-send a transaction message or fail if it has failed too many times.

Warning
If you are using an all grouping in your topology, and any instance of the bolt fails, then the fail method of the spout will be called as well.
public void fail(Object msgId) {
    Integer transactionId = (Integer) msgId;
    // Check the number of times the transaction has failed
    Integer failures = transactionFailureCount.get(transactionId) + 1;

    if(fails >= MAX_FAILS){
        // If the number of failures is too high, terminate the topology
        throw new RuntimeException("Error, transaction id ["+transactionId+"] has had too many errors ["+failures+"]");
    }

    // If the number of failures is less than the maximum, save the number and re-send the message
    transactionFailureCount.put(transactionId, failures);
    toSend.put(transactionId,messages.get(transactionId));
    LOG.info("Re-sending message ["+msgId+"]");
}

First, we check the number of times the transaction has failed. If a transaction fails too many times we throw a RuntimeException to terminate the worker where is running. Otherwise, we save the failure count and put the transaction message in the toSend queue so that it will be re-sent when nextTuple is called.

Warning
Storm nodes do not maintain state, so if you store information in memory (as in this example) and the node goes down, you will lose all stored information.
Tip
Storm is a fast-fail system. If an exception is thrown, the topology will go down, but Storm will restart the process in a consistent state so that it can recover correctly.

Getting data

Here we’ll take a look at some common techniques for designing spouts that collect data efficiently from multiple sources.

Direct connection

In a direct connection architecture, the spout connects directly to a message emitter (see Direct connection spout.).

Direct connection spout
Figure 1. Direct connection spout.

This architecture is simple to implement, particularly when the message emitter is a well known device or a well known device group. A well known device is one which is known at startup, and remains the same throughout the life of the topology. An unknown device is one which is added after the topology is already running. A well know device group is one in which all devices in the group are known at start time.

As an example, we’ll create a spout to read the Twitter stream using the Twitter streaming API. The spout will connect directly to the API, which serves as the message emitter. We’ll filter the stream to get all public tweets that match the track parameter (as documented on the Twitter dev page). The complete example can be found at Twitter Example github page.

The spout gets the connection parameters from the configuration object (track, user and password) and creates a connection to the API (in this case using the DefaultHttpClient from Apache). It reads the connection one line at a time, parses the line from JSON format into a Java object, and emits it.

public void nextTuple() {
        // Create a connection to the known source
        client = new DefaultHttpClient();
        client.setCredentialsProvider(credentialProvider);
        HttpGet get = new HttpGet(STREAMING_API_URL+track);

        ....

                //Execute get and create a reader for the Twitter Streaming API
                BufferedReader reader = ...
                String in;
                //Read line by line
                while((in = reader.readLine())!=null){
                    try{
                        //Parse and emit
                        Object json = jsonParser.parse(in);
                        collector.emit(new Values(track,json));
                    }catch (ParseException e) {
                        LOG.error("Error parsing message from twitter",e);
                    }
                }

        ....
    }
Tip
Here we are locking the the nextTuple method, so we never execute the ack and fail methods. In a real application, we recommended that you do the locking into a separate thread and use an internal queue to exchange information (we’ll cover how to do that in the next example, Enqueued Messages).

This is great!

We’re reading the Twitter stream with a single spout. If we parallelize the topology, we’ll have several spouts reading different partitions of the same stream, which doesn’t make sense. So how do we parallelize processing if we have several streams to read? One interesting feature of Storm is that we can access the TopologyContext from any component (spouts/bolts), using this we can divide the streams between our spout instances.

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {

       //Get the spout size from the context
        int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size();

        //Get the id of this spout
        int myIdx = context.getThisTaskIndex();

        String[] tracks = ((String) conf.get("track")).split(",");
        StringBuffer tracksBuffer = new StringBuffer();
        for(int i=0; i< tracks.length;i++){

            //Check if this spout must read the track word
            if( i % spoutsSize == myIdx){
                tracksBuffer.append(",");
                tracksBuffer.append(tracks[i]);
            }
        }
        if(tracksBuffer.length() == 0)
            throw new RuntimeException("No track found for spout" +
                    " [spoutsSize:"+spoutsSize+", tracks:"+tracks.length+"] the amount" +
                    " of tracks must be more then the spout paralellism");
        this.track =tracksBuffer.substring(1).toString();

        ....

   }

Using this technique, we can distribute collectors evenly across data sources. The same technique can be applied in other situations - for example, for collecting log files from web servers. See Direct connection hashing..

Direct connection hashing
Figure 2. Direct connection hashing.

In the example above, we connected the spout to a well known device. We can use the same approach to connect to unknown devices using a coordinating system to maintain the device list. The coordinator detects changes to the list and creates and destroys connections. For example, when collecting log files from web servers, the list of web servers may change over time. When a web server is added, the coordinator detects the change and creates a new spout for it. See Direct connection coordinator..

Direct connection coordinator
Figure 3. Direct connection coordinator.
Tip
It’s recommended to create connections from spouts to message emitters, rather than the other way around. If the machine on which a spout is running goes down, Storm will restart it on another machine, so it’s easier for the spout to locate the message emitter than for the message emitter to keep track of which machine the spout is on.

Enqueued messages

The second approach is to connect our spouts to a queue system that will receive the messages from the message emitters, and will leave the messages available for consumption by the spouts. The advantage of using a queue system is that it can serve as middleware between the spouts and data source, in many cases we can use the queue to be reliables using the capability of replay messages of many queue systems. This means we don’t need to know anything about message emitters, and the process of adding and removing emitters will be easier that with direct connection. The problem with this architecture is that the queue will be our point of failure, and we’ll be adding a new layer to our processing flow.

Using a queue system. shows the architecture schema.

ch04 queueconnection
Figure 4. Using a queue system.
Tip
We can use round-robin pull or hashing queues (divide the queue messages by hash to send it to the spouts or create many queues) to parallelize the processing through queues, dividing the messages between many spouts

We’ll create an example using Redis as out queue system and their java library Jedis. In our example, we’ll create a log processor to collect logs from an unknown source using the command lpush to insert messages into the queue and blpop to allow us to wait for a message. If we have mnany processes, using blpop will let us receive the messages in roung-robin fashion.

To retrieve messages from Redis, we’ll use a thread created at the open spout (using a thread to avoid locking the main loop where the nextTuple method is):

    new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try{
                        Jedis client = new Jedis(redisHost, redisPort);
                        List<String> res = client.blpop(Integer.MAX_VALUE, queues);
                        messages.offer(res.get(1));
                    }catch(Exception e){
                        LOG.error("Error reading queues from redis",e);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e1) {}
                    }
                }

            }
     }).start()

The only purpose of this thread is to create the connection and execute the blpop command. When a message is received, it is added to an internal queue of messages that will be consumed by the nextTuple method. Here we can see that the source is the Redis queue and we don’t know which are the message emitter neither their quantity.

Tip
We recommend that you not create many threads with spout, because each spout runs in a different thread. Instead of creating many threads, it is better to increase the parallelism. This will create more threads in a distributed fashion through the Storm cluster.

In our nextTuple method, the only thing that we’ll do is receive the messages and emit them again.

    public void nextTuple() {
        while(!messages.isEmpty()){
            collector.emit(new Values(messages.poll()));
        }
    }
Tip
We could transform this spout to give us the posibility of replay messages from redis transform this topology into a reliable topology

DRPC

DRPCSpout is a spout implementation that receives a function invocation stream from the DRPC server and processes it (see the example in Chapter 3, Topologies). In the most common cases, using the backtype.storm.drpc.DRPCSpout will be enough, but it’s possible to create our own implementation using the DRPC classes included with the Storm package.

Conclusions

We have seen the common spout implementation patterns, their advantages, and how to make the messages reliable. It’s important to define spout communication based on the problem that we are working on. There is no one architecture that fits all topologies. If we know the sources or we can control these sources, then we can use a direct connection, while if we need the capacity to add unknown sources or receive messages from variety sources, it’s better to use a queued connection. If we need an on-line process, we will need to use DRPCSpouts or implement something similar.

Although we have discussed the three main types of connections, there are infinite ways to do it depending on your needs.