Sunday, March 7, 2010

Using App Engine's task queue to break problems down

Two weeks ago when Twilio announced their developer contest for their new SMS API, I decided to build a mobile application that let me query the Madison Metro bus system to determine when my bus would arrive.

Although the entry did not win the contest, it was named mashup of the day last week at Programmable Web! It's called SMS My Bus, and if you live in Madison and ride the bus I encourage you to take advantage of it! You can find details about it here...

http://www.smsmybus.com

The basic architecture of the application is straight forward. SMS messages are sent to my Twilio phone number and Twilio routes them to my server via HTTP POST requests. I do a schedule look-up based on the user's input and return the results.

The tricky parts stem primarily from the fact that:

1. The Madison Metro doesn't actually provide web services for this data. The consequences for an app like mine is that I need to do a bit of screen scraping to find the data I'm after.

2. I chose to deploy this app using Google App Engine, and URL scraping can become a show stopper since GAE is resource limiting for every request that runs. GAE will only let you a single request for about 30 seconds.

Needless to say, I would love it if my fine city of Madison would join the Gov 2.0 movement, and open up more of its rich data via standard web services

In the meantime... I needed a solution that would allow me to gather disparate data across many, many URLS. As an example, the busiest stop in the Metro system has 34 buses passing through it. I may need to grab 34 different web pages to begin to piece together the schedule as it relates to the caller at that stop.

I took advantage of GAE's Task Queue API and memcache counters to tackle this problem by farming out autonomous jobs that find the next available bus per route per stop, and at the end aggregating the results.

Admittedly, this is not advanced Computer Science. But I hope other App Engine developers can find some use in the pattern.


1. Define my task queues


I used two different task queues to manage the process. One, called aggregation, that queried individual routes at a stop. And another, called aggregationSMS, that pieced the results together for the return SMS message.

- name: aggregation
  rate: 20/s
  bucket_size: 1

- name: aggregationSMS
  rate: 10/s
  bucket_size: 1


2. Spawn tasks


When an SMS request arrives, the request handler parses the input to determine the request parameters. If the request does not include a specific bus route, I'll query my route table for every route that passes through the respective stop. This table contains URLs for the the real time arrival estimates.

I loop over the result set and create new tasks for the aggregation task queue.
    q = db.GqlQuery("SELECT * FROM RouteListing WHERE stopID = :1",stopID)
    routeQuery = q.fetch(100)
    if len(routeQuery) > 0:
        # create a counter for a universally unique caller ID
        memcache.add(sid, 0)

        # loop over every route at this stop
        for r in routeQuery:
          # the unique counter for this caller's request
          counter = memcache.incr(sid)

          # spawn a task for this stop/route tuple
          task = Task(url='/aggregationtask',
                      params={'sid':sid,
                              'stop':stopID,
                              'route':r.route,
                              'direction':r.direction,
                              'url':r.scheduleURL,
                              'caller':caller
                              })
          task.add('aggregation')
    else:
        # do some error handling


3. Define the task handlers


There are two task handlers. One to tackle the smallest job of determining the schedule for an individual bus at a stop. And one to piece all of these results together when the system is ready to reply to the caller.

The task handler, AggregationHandler, does the specific work to scrape the scheduling information from the bus system's site. The handler does three things.



  • Scrape the web page to find the next stop time.

  • Store the results in the datastore

  • Decrement the memcache counter for this transaction


Many of the implementation details have been stripped out of the following code snippet...



class AggregationHandler(webapp.RequestHandler):

def post(self):
# extract the parameters for this task
sid = self.request.get('sid')
directionID = self.request.get('direction')
# more inputs as well...

# 1. fetch the real time data
result = urlfetch.fetch(scheduleURL)

# scrape the page
textBody = result.getNextTime()

# 2. store these results in the datastore
stop = BusStopAggregation()
stop.stopID = stopID
stop.routeID = routeID
stop.sid = sid # the sid identifies the caller's transaction
stop.text = textBody
stop.put()

# 3. decrement the counter
counter = memcache.decr(sid)

# if we've completed the scraping, create a task to
# piece the results together.
if counter == 0:
   task = Task(url='/aggregationSMStask',
                     params={'sid':sid,'caller':caller})
   task.add('aggregationSMS')

# delete the counter for this transaction
memcache.delete(sid)

return

The task handler, AggregationSMSHandler, does the job of piecing the results together. It relies on the unique SID for a caller's transaction to query the datastore and find the scheduling details.


class AggregationSMSHandler(webapp.RequestHandler):

def post(self):
# extract the task's inputs
sid = self.request.get('sid')
phone = self.request.get('caller')

# sort the results by time to find soonest upcoming stops
q = db.GqlQuery("SELECT * FROM BusStopAggregation WHERE sid = :1 ORDER BY time", sid)

# we'll only send the next four stops in the reply message
routeQuery = q.fetch(4)
stopID = routeQuery[0].stopID
textBody = "Stop: %sn" % routeQuery[0].stopID
for r in routeQuery:
textBody += "Route %s: " % r.routeID + " %s" % r.text + "n"
else:
textBody = "Doesn't look good... Your bus isn't running right now!"

# send off the result via the twilio API
outboundSMS(phone, textBody)


Results


This pattern allowed me to almost completely mitigate the DeadlineExceededExceptions on GAE. I've yet to see a timeout problem inside the app. It's always possible that a single task could take too long, but if a task fails, it will re-queue itself and run again until it succeeds.

It's worth pointing out that another use of the Task Queue that I used but didn't show in the code snippets were for other repeated, remote tasks. For example, when I interface with the Twilio API, I do that by spawning a task to do the work. Likewise, I log Twilio events in the datastore on their own task queue as well.

11 comments:

  1. Nice work - really great project!You might be interested in taking a look at @imified (http://www.imified.com/) - you can build SMS apps, bit also tie your app to other communications channels like IM networks and Twitter.Hope this helps. Again, great project!

    ReplyDelete
  2. Are you using IMified for a project right now?

    ReplyDelete
  3. I've used IMified for several projects. Here's an example of one:http://www.voiceingov.org/blog/?p=1333http://github.com/mheadd/toronto-childcare-finderCheers.

    ReplyDelete
  4. nice app... i love the gov2.0 initiative and am hopeful that madison can adopt it. i have an email interface for the bus service and was going to add a twitter service as well. i'll give the imified twitter api a try first. thanks!

    ReplyDelete
  5. Nice blog post Greg, not sure why I didn't catch this before. We can't wait to come hang out in Madison and meet developers there so let us know when a good time to drop in would be. We'll also be at Gov 2.0 expo in Washington DC, are you planning on going to that? This seems like the perfect app to demo at our booth.Cheers,Danielle @ Twilio

    ReplyDelete
  6. Greg, great stuff.If we can help in any way with IMified, please don't hesitate to email me personally. Also, most of the IMified technology is now in Tropo (http://tropo.com/), meaning you can write to a single unified API and have support for IM, SMS and voice. No need to manage separate APIs for each.Looks like Tropo's speech recognition would come in very handy when checking for the bus stop. Just ask the user what route and what stop, and understand what they say back.

    ReplyDelete
  7. I'd love to join you in DC! I need to work on the City of Madison to sponsor me... :)

    ReplyDelete
  8. Thanks, Adam! Someone had asked me to port my Ringerous app to Tropo not that long ago. But I must admit. I'm a bit of a Twilio fanboy. I do like the IMified integration with Twitter so I'm giving that a try. Thanks again!

    ReplyDelete
  9. Good stuff Adam, the use of the Google Task API is well explained. Let us know if we may help with getting you some speech recognition in your apps, as talking to an app is quite compelling for users.Keep up the good work!

    ReplyDelete
  10. Thanks for informative post. I'm working on using task queues with twilio so your tips are helpful in understanding how I can use task queues to make lots of outbound calls without timing out GAE.

    ReplyDelete
  11. You twitter login does not work by the way.Anyway, these are all great ideas but they require the SMS to be sent through a gateway that costs around 1 penny per send. That is ridiculous! If you app takes off and sends millions of texts per month you will go in debt very quickly.I am an SMS developer that uses Android phones on unlimited texting plans. I have create several texting apps, one of which went viral and has 300k downloads. I am now integrating artificial intelligence into my apps and creating custom App Engine api's for the app to integrate with for data dissemination.I see a future where these web gateways are useless because someone will simply buy a cheap phone and install one of my apps. Then users can interact with them via sms with no problems or high costs.

    ReplyDelete