Saturday, October 1, 2011

The Worker

More Work?

One of the first serious Google AppEngine subjects I've approached recently is the problem of doing work in the background. In my particular case I needed to do some intensive and error prone tasks, then send an email with the results (which is also error prone), on a schedule.

I was going to write some standard job-processing-in-a-loop kind of code, with the loop being processed as a cron job (set up in cron.yaml). That's what Syyncc does. But some bit of my brain kept grumbling about the inelegance of that approach. You're on a platform that wants to do it a different way, says my brain (and who am I to disagree?).

And the cron thing is kind of bad, because it doesn't scale. Let's say I schedule a job every two minutes. It can get through some fixed amount of work (maybe 10 jobs?) before it hits its time limit. It can never do more than that. That's nasty.

People often recommend backends for this kind of work. With them, you stick jobs on a pull queue, and pull them off with the backend. Each backend can process a limited amount of jobs, but you can set them to be automatically created in response to workload, which is cool.

But I'm partial to push queues, what were previously just called Task Queues. At any point in code you can schedule a task to run, which simply comes through as a post to a url in your app:

        taskqueue.add(url='/dosomething', params={'key': key})

It's a bit clunky, because you need to set up a handler for the url, and implement the Post method.

Oh wait, no you don't. Nick Johnson wrote the excellent deferred.defer library, which takes care of the public url and thunking the call from there into a method of your choice. So instead your call can look like this:


from google.appengine.ext import deferred

  def do_something_expensive(a, b, c=None):
      logging.info("Doing something expensive!")
      # Do your work here

  # Somewhere else
  deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)


That's cool, isn't it!

What's also cool about tasks is that you can delay them, either by specifying countdown or an eta. Using a countdown (number of seconds before execution) is interesting, because you can delay tasks, ie: spread the work out a bit. But using an eta is really fascinating, because it lets you schedule work for specific times. So if you need to schedule an email to go out at midnight, a task with an eta will do that for you, with no real plumbing required on your part. (Can you do this with a pull queue? You may be able to use eta to stop tasks showing up through the lease system before a specified time, I'm not sure about this.)

This is all great for performing scheduled background tasks. Except, what if they fail? Or take a long time to complete? In fact, how can you report on the status of these tasks? Well, you can't. There's no way to go in and find out much about the task through any APIs. Even if there was, you'd probably need custom information suited to the job at hand anyway.

What I need is an object in the datastore that maps to the task. I personally prefer an object oriented approach (ok, I'm an old man set in my ways, yes I know). So, what I'd like is a base object which lets me set up a task, kick it off, record its progress, and lets me see afterwards how it went.

So I created the Worker. The worker is a base class polymodel object, that you can use to do background jobs. You need to override it, and provide it with a job to do (doExecute()) and a method for calculating the next time to run if you want a repeating job (doCalculateNextRun()). You can also provide a specific queue name (override GetQueue()) and you can specify whether or not it should run immediately (override ExecuteImmediately()). If ExecuteImmediately() returns false, then on the first, immediate run it wont call doExecute(), but instead will call doCalculateNextRun() and reschedule itself.

So for instance, if you want to run a background job immediately (say send an email), you make this class:

class SendAnEmailImmediately(Worker)
      
     def doExecute(self):
          logging.info("Sending emails to %s" % lemailStr)
          lmessage = mail.EmailMessage(
                          sender="Anne@example.com",
                          to="Betty@example.com",
                          subject= "Hi Betty",
                          body="I know you love email!"
                          )
          lmessage.send()
def doCalculateNextRun(self, aUtcNow, alastDue):
           return None # never reschedule

To kick it off, do this:

 
    lsender = SendAnEmailImmediately()
    lsender.status = 0
    lsender.enabled = True
    lsender.put()


And what do you get out of that? Well, not only does the email get sent from a background task, but afterward you'll have a SendAnEmailImmediately object in the datastore, with these properties:

    lastRunSucceeded = db.BooleanProperty()
    lastRunMessage = db.StringProperty()
    lastRunStartTime = db.DateTimeProperty()
    lastRunFinishTime = db.DateTimeProperty()
which give you information on when it ran and how the worker actually went; did it fail? If so, what errors occurred? 

How about a recurring task? Try this one, which sends an email once per hour:

class SendAnEmailEveryHour(Worker) def doExecute(self): logging.info("Sending emails to %s" % lemailStr) lmessage = mail.EmailMessage( sender="Anne@example.com", to="Betty@example.com", subject= "Hi again Betty", body="Are you feeling loved yet?" ) lmessage.send() def doCalculateNextRun(self, aUtcNow, alastDue): if alastDue: lbaseDate = alastDue else: lbaseDate = aUtcNow return lbaseDate + timedelta(minutes=60)

and again, kick it off like this:

    lsender = SendAnEmailEveryHour()
    lsender.status = 0
    lsender.enabled = True
    lsender.put()
Ok, that'll work. However, what if we want a record of each run? Then do it like this instead:

    class SendAnEmailEveryHour2(Worker)
        def doExecute(self):
            lsender = SendAnEmailImmediately()
            lsender.status = 0
            lsender.enabled = True
            lsender.put()

        def doCalculateNextRun(self, aUtcNow, alastDue):
            if alastDue:
                lbaseDate = alastDue
            else:
                lbaseDate = aUtcNow
            return lbaseDate + timedelta(minutes=60)



So now you get a recurring worker kicking off other workers, one per job.



You can see how powerful this is as a simple method of structuring background jobs!



Ok, hold onto your hats, excuse my n00bish python, and get ready for a slab of code. Here's the implementation of Worker:


##################################################################
from google.appengine.ext import db
from google.appengine.ext.db import polymodel
import logging
from datetime import datetime
from datetime import timedelta
from google.appengine.ext import deferred
from lib.pytz.gae import pytz
import uuid

class Worker(polymodel.PolyModel):
    nextDue = db.DateTimeProperty()
    enabled = db.BooleanProperty()
    status = db.IntegerProperty() # 0 = ready, 1 = running, 2 = stopped
    lastRunSucceeded = db.BooleanProperty()
    lastRunMessage = db.StringProperty() # only if
    lastRunStartTime = db.DateTimeProperty()
    lastRunFinishTime = db.DateTimeProperty()
    createTime = db.DateTimeProperty(auto_now_add = True)
    taskid = db.StringProperty()

    # override to change queues
    def GetQueue(self):
        return "default"

    # override to do first run in the future    
    def ExecuteImmediately(self):
        return True
    
    # must override to perform work 
def doExecute(self): raise NotImplementedError
    # override to tell us when next to run 
def doCalculateNextRun(self, aUtcNow, alastDue): raise NotImplementedError def Execute(self, aTaskID, aIsFirstRun, **kwargs): try: #Don't trust depickled self, go reload self
            #Nick Johnson told me not to do this - needs to be fixed
self = db.get(self.key())
        except db.NotSavedError, ex:
            self = None
        
        lutcNow = datetime.utcnow()
        
        if not self:
            logging.warning("eek I am gone! (disappears in a puff of logic)")
        elif not aTaskID:
            logging.warning("No aTaskID, skipping")
        elif aTaskID != self.taskid:
            logging.debug("TaskIDs do not match, skipping")
        elif not self.enabled:
            logging.warning("Disabled, skipping")
        elif self.status != 0:
            logging.warning("Wrong status to execute Worker, status = %s, skipping" % (self.status))
        elif self.nextDue and self.nextDue > lutcNow:
            logging.debug("Don't run till %s, reschedule..." % (self.nextDue))
            if (self.nextDue - lutcNow) > timedelta(1):
                # don't reschedule more than a day forward
                lresched = lutcNow + timedelta(1) # add a day
            else:
                lresched = self.nextDue

            lqueue = self.GetQueue()
            
            deferred.defer(
                self.Execute,
                _queue_name=lqueue,
                _eta=lresched,
                aTaskID=self.taskid,
                aIsFirstRun=aIsFirstRun,
            )
        else:
            if aIsFirstRun and not self.nextDue and not self.ExecuteImmediately():
                logging.debug("First run, don't execute")
            else:
                logging.debug("We can execute")
                try:
                    self.status = 1 # running
                    self.lastRunStartTime = datetime.utcnow()
                    self.put()
    
                    logging.debug("Before doExecute()")
                    self.doExecute()
                    logging.debug("After doExecute()")
                    
                    self.status = 0 # ready to run
                    self.lastRunSucceeded = True
                    self.lastRunMessage = None
                except Exception, ex:
                    self.status = 0
                    self.lastRunSucceeded = False
                    self.lastRunMessage = unicode(ex)
                    logging.error(ex)

                self.lastRunFinishTime = datetime.utcnow()

            logging.debug("calculate lnextRun")
            lnextRun = None
            try:
                lutcnow = datetime.utcnow()
                lnextRun = self.doCalculateNextRun(datetime.utcnow(), self.nextDue)
            except Exception, ex:
                logging.error(ex)
            
            if lnextRun:
                logging.debug("got lnextRun, need to reschedule")
                self.nextDue = lnextRun
                self.status = 0
                self.put()
            
                lqueue = self.GetQueue()
                
                if (lnextRun - lutcnow) > timedelta(1):
                    lresched = lutcnow + timedelta(1)
                else:
                    lresched = lnextRun
                    
                if lresched <= lutcnow:
                    # run immediately, no eta provided
                    deferred.defer(
                        self.Execute, 
                        _queue_name=lqueue, 
                        aTaskID=self.taskid, 
                        aIsFirstRun=False
                    )
                else:
                    # schedule future run
                    deferred.defer(
                        self.Execute, 
                        _queue_name=lqueue, 
                        _eta=lresched, 
                        aTaskID=self.taskid, 
                        aIsFirstRun=False
                    )
            else:
                logging.debug("no lnextRun, we are finished.")
                self.status = 2
                self.put()
    # Need to override put to kick off the task if enabled is 
    # set to True
    def put(self, **kwargs):
        lneedPut = True

        # first grab a copy of what's currently stored.
        logging.debug("Entered put, new self = %s" % (self))

        loldself = None
        if self.enabled:
            logging.debug("Need to find out if enabled has been newly set. Load old self from datastore")
    
            try:
                loldself = self.get(self.key())
            except Exception, ex:
                logging.error(ex)
                loldself = None
    
        logging.debug("See if newly enabled has changed")
        if self.enabled and (not loldself or not loldself.enabled):
            logging.debug("Newly enabled. Need to schedule self to run")

            self.taskid = unicode(uuid.uuid4())
            logging.debug("taskid == %s" % (self.taskid))

            logging.debug("Now schedule to run immediately")
    
            #self.nextDue = None
            self.status = 0
            
            logging.debug("Pre-save")
            super(Worker, self).put(**kwargs)
            lneedPut = False
        
            lqueue = self.GetQueue()

            # run immediately                
            logging.debug("call deferred.defer")
            deferred.defer(
                self.Execute, 
                _queue_name=lqueue, 
                aTaskID=self.taskid, 
                aIsFirstRun=True
            )
        else:
            logging.debug("not newly enabled")

        if lneedPut:
            logging.debug("Do the actual put")
            super(Worker, self).put(**kwargs)
##################################################################    

A couple of footnotes before I leave you with this:

1: I've used deferred.defer on a class method, which has an issue. Specifically, it has to pickle the whole class, then depickle it when the task is run. That's a little expensive, and it leaves the running task with an old version of the object. So, I have to do this:

self = db.get(self.key())


to replace the passed in version of the object with the object from the datastore.

What would be better would be if Execute were a class method, and I passed the self.key() as a parameter to it, then loaded the full instance using the key on entry to Execute. It's a simple change, but I want to test it before I change it here. I'm sure people will point out all kinds of issue, so I'll wait to change it until then.

2: You'd think I'd have some kind of "Go()" method to kick things off, instead of using an override on put() to detect a change in the "enabled" property. However, I've been specifically using this in the context of a REST api, where I don't want to be calling methods. So, this method of overriding put() has been just the ticket. To complete the implementation I should also do some monkey patching of db.put(), but I haven't needed that yet and it's a minor PITA to do, so it's left to the reader for now. Actually, this approach of overriding put() to do work in a REST context is a paradigm I'll explore in detail in a subsequent post, it goes really well with the rest library appengine-rest-server.

No comments:

Post a Comment