Sunday, March 7, 2010

Using App Engine's task queue to break problems down

Two weeks ago when Twilio announced their developer contest for their new SMS API, I decided to build a mobile application that let me query the Madison Metro bus system to determine when my bus would arrive.

Although the entry did not win the contest, it was named mashup of the day last week at Programmable Web! It's called SMS My Bus, and if you live in Madison and ride the bus I encourage you to take advantage of it! You can find details about it here...

http://www.smsmybus.com

The basic architecture of the application is straight forward. SMS messages are sent to my Twilio phone number and Twilio routes them to my server via HTTP POST requests. I do a schedule look-up based on the user's input and return the results.

The tricky parts stem primarily from the fact that:

1. The Madison Metro doesn't actually provide web services for this data. The consequences for an app like mine is that I need to do a bit of screen scraping to find the data I'm after.

2. I chose to deploy this app using Google App Engine, and URL scraping can become a show stopper since GAE is resource limiting for every request that runs. GAE will only let you a single request for about 30 seconds.

Needless to say, I would love it if my fine city of Madison would join the Gov 2.0 movement, and open up more of its rich data via standard web services

In the meantime... I needed a solution that would allow me to gather disparate data across many, many URLS. As an example, the busiest stop in the Metro system has 34 buses passing through it. I may need to grab 34 different web pages to begin to piece together the schedule as it relates to the caller at that stop.

I took advantage of GAE's Task Queue API and memcache counters to tackle this problem by farming out autonomous jobs that find the next available bus per route per stop, and at the end aggregating the results.

Admittedly, this is not advanced Computer Science. But I hope other App Engine developers can find some use in the pattern.


1. Define my task queues


I used two different task queues to manage the process. One, called aggregation, that queried individual routes at a stop. And another, called aggregationSMS, that pieced the results together for the return SMS message.

- name: aggregation
  rate: 20/s
  bucket_size: 1

- name: aggregationSMS
  rate: 10/s
  bucket_size: 1


2. Spawn tasks


When an SMS request arrives, the request handler parses the input to determine the request parameters. If the request does not include a specific bus route, I'll query my route table for every route that passes through the respective stop. This table contains URLs for the the real time arrival estimates.

I loop over the result set and create new tasks for the aggregation task queue.
    q = db.GqlQuery("SELECT * FROM RouteListing WHERE stopID = :1",stopID)
    routeQuery = q.fetch(100)
    if len(routeQuery) > 0:
        # create a counter for a universally unique caller ID
        memcache.add(sid, 0)

        # loop over every route at this stop
        for r in routeQuery:
          # the unique counter for this caller's request
          counter = memcache.incr(sid)

          # spawn a task for this stop/route tuple
          task = Task(url='/aggregationtask',
                      params={'sid':sid,
                              'stop':stopID,
                              'route':r.route,
                              'direction':r.direction,
                              'url':r.scheduleURL,
                              'caller':caller
                              })
          task.add('aggregation')
    else:
        # do some error handling


3. Define the task handlers


There are two task handlers. One to tackle the smallest job of determining the schedule for an individual bus at a stop. And one to piece all of these results together when the system is ready to reply to the caller.

The task handler, AggregationHandler, does the specific work to scrape the scheduling information from the bus system's site. The handler does three things.



  • Scrape the web page to find the next stop time.

  • Store the results in the datastore

  • Decrement the memcache counter for this transaction


Many of the implementation details have been stripped out of the following code snippet...



class AggregationHandler(webapp.RequestHandler):

def post(self):
# extract the parameters for this task
sid = self.request.get('sid')
directionID = self.request.get('direction')
# more inputs as well...

# 1. fetch the real time data
result = urlfetch.fetch(scheduleURL)

# scrape the page
textBody = result.getNextTime()

# 2. store these results in the datastore
stop = BusStopAggregation()
stop.stopID = stopID
stop.routeID = routeID
stop.sid = sid # the sid identifies the caller's transaction
stop.text = textBody
stop.put()

# 3. decrement the counter
counter = memcache.decr(sid)

# if we've completed the scraping, create a task to
# piece the results together.
if counter == 0:
   task = Task(url='/aggregationSMStask',
                     params={'sid':sid,'caller':caller})
   task.add('aggregationSMS')

# delete the counter for this transaction
memcache.delete(sid)

return

The task handler, AggregationSMSHandler, does the job of piecing the results together. It relies on the unique SID for a caller's transaction to query the datastore and find the scheduling details.


class AggregationSMSHandler(webapp.RequestHandler):

def post(self):
# extract the task's inputs
sid = self.request.get('sid')
phone = self.request.get('caller')

# sort the results by time to find soonest upcoming stops
q = db.GqlQuery("SELECT * FROM BusStopAggregation WHERE sid = :1 ORDER BY time", sid)

# we'll only send the next four stops in the reply message
routeQuery = q.fetch(4)
stopID = routeQuery[0].stopID
textBody = "Stop: %sn" % routeQuery[0].stopID
for r in routeQuery:
textBody += "Route %s: " % r.routeID + " %s" % r.text + "n"
else:
textBody = "Doesn't look good... Your bus isn't running right now!"

# send off the result via the twilio API
outboundSMS(phone, textBody)


Results


This pattern allowed me to almost completely mitigate the DeadlineExceededExceptions on GAE. I've yet to see a timeout problem inside the app. It's always possible that a single task could take too long, but if a task fails, it will re-queue itself and run again until it succeeds.

It's worth pointing out that another use of the Task Queue that I used but didn't show in the code snippets were for other repeated, remote tasks. For example, when I interface with the Twilio API, I do that by spawning a task to do the work. Likewise, I log Twilio events in the datastore on their own task queue as well.