Sunday, January 6, 2013

gaedocstore: JSON Document Database Layer for ndb

In my professional life I'm working on a server side appengine based system whose next iteration needs to be really good at dealing with schema-less data; JSON objects, in practical terms. To that end I've thrown together a simple document database layer to sit on top of appengine's ndb, in python.

Here's the github repo:

And here's the doco as it currently exists in the repo, it should explain what I'm up to.

This library will no doubt change as begins to be used in earnest.


gaedocstore is MIT licensed
gaedocstore is a lightweight document database implementation that sits on top of ndb in google appengine.


If you are using appengine for your platform, but you need to store arbitrary (data defined) entities, rather than pre-defined schema based entities, then gaedocstore can help.
gaedocstore takes arbitrary JSON object structures, and stores them to a single ndb datastore object called GDSDocument.
In ndb, JSON can simply be stored in a JSON property. Unfortunately that is a blob, and so unindexed. This library stores the bulk of the document in first class expando properties, which are indexed, and only resorts to JSON blobs where it can't be helped (and where you are unlikely to want to search anyway).
gaedocstore also provides a method for denormalised linking of objects; that is, inserting one document into another based on a reference key, and keeping the inserted, denormalised copy up to date as the source document changes. Amongst other uses, this allows you to provide performant REST apis in which objects are decorated with related information, without the penalty of secondary lookups.

Simple Put

When JSON is stored to the document store, it is converted to a GDSDocument object (an Expando model subclass) as follows:
  • Say we are storing an object called Input.
  • Input must be a dictionary.
  • Input must include a key at minimum. If no key is provided, the put is rejected.
    • If the key already exists for a GDSDocument, then that object is updated using the new JSON.
    • With an update, you can indicate "Replace" or "Update" (default is Replace). Replace entirely replaces the existing entity. "Update" merges the entity with the existing stored entity, preferentially including information from the new JSON.
    • If the key doesn't already exist, then a new GDSDocument is created for that key.
  • The top level dict is mapped to the GDSDocument (which is an expando).
  • The GDSDocument property structure is built recursively to match the JSON object structure.
    • Simple values become simple property values
    • Arrays of simple values become a repeated GenericProperty. ie: you can search on the contents.
    • Arrays which include dicts or arrays become JSON in a GDSJson object, which just hold "json", a JsonProperty (nothing inside is indexed, or searchable)
    • Dictionaries become another GDSDocument
    • So nested dictionary fields are fully indexed and searchable, including where their values are lists of simple types, but anything inside a complex array is not.
ldictPerson = {
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "addr1": "1 thing st",
        "city": "stuffville",
        "zipcode": 54321,
        "tags": ['some', 'tags']

lperson = GDSDocument.ConstructFromDict(ldictPerson)
This will create a new person. If a GDSDocument with key "897654" already existed then this will overwrite it. If you'd like to instead merge over the top of an existing GDSDocument, you can use aReplace = False, eg:
    lperson = GDSDocument.ConstructFromDict(lperson, aReplace = False)

Simple Get

All GDSDocument objects have a top level key. Normal ndb.get is used to get objects by their key.


Normal ndb querying can be used on the GDSDocument entities. It is recommended that different types of data (eg Person, Address) are denoted using a top level attribute "type". This is only a recommended convention however, and is in no way required.
You can query on properties in the GDSDocument, ie: properties from the original JSON.
Querying based on properties in nested dictionaries is fully supported.
eg: Say I store the following JSON:
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "key": "1234567",
        "type": "Address",
        "addr1": "1 thing st",
        "city": "stuffville",
        "zipcode": 54321
A query that would return potentially multiple objects including this one is:
GDSDocument.gql("WHERE address.zipcode = 54321").fetch()
s = GenericProperty()
s._name = 'address.zipcode'
GDSDocument.query(s == 54321).fetch()
Note that if you are querying on properties below the top level, you cannot do the more standard
GDSDocument.query(GenericProperty('address.zipcode') == 54321).fetch()  # fails
If you need to get the json back from a GDSDocument, just do this:
json = lgdsDocument.to_dict()

Denormalized Object Linking

You can directly support denormalized object linking.
Say you have two entities, an Address:
    "key": "1234567",
    "type": "Address",
    "addr1": "1 thing st",
    "city": "stuffville",
    "zipcode": 54321
and a Person:
    "key": "897654",
    "type": "Person",
    "name": "Fred"
    "address": // put the address with key "1234567" here
You'd like to store the Person so the correct linked address is there; not just the key, but the values (type, addr1, city, zipcode).
If you store the Person as:
    "key": "897654",
    "type": "Person",
    "name": "Fred",
    "address": {"key": "1234567"}
then this will automatically be expanded to
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "key": "1234567",
        "type": "Address",
        "addr1": "1 thing st",
        "city": "stuffville",
        "zipcode": 54321
Furthermore, gaedocstore will update these values if you change address. So if address changes to:
    "key": "1234567",
    "type": "Address",
    "addr1": "2 thing st",
    "city": "somewheretown",
    "zipcode": 12345
then the person will automatically update to
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "key": "1234567",
        "addr1": "2 thing st",
        "city": "somewheretown",
        "zipcode": 12345
Denormalized Object Linking also supports pybOTL transform templates. gaedocstore can take a list of "name", "transform" pairs. When a key appears like
    "something": { key: XXX },
then gaedocstore loads the key referenced. If found, it looks in its list of transform names. If it finds one, it applies that transform to the loaded object, and puts the output into the stored GDSDocument. If no transform was found, then the entire object is put into the stored GDSDocument as described above.
Say we have the transform "address" as follows:
ltransform = {
    "fulladdr": "{{.addr1}}, {{.city}} {{.zipcode}}"
You can store this transform against the name "address" for gaedocstore to find as follows:
GDSDocument.StorebOTLTransform("address", ltransform)
Then when Person above is stored, it'll have its address placed inline as follows:
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "key": "1234567",
        "fulladdr": "2 thing st, somewheretown 12345"
An analogous process happens to embedded addresses whenever the Address object is updated.
You can lookup the bOTL Transform with:
ltransform = GDSDocument.GetbOTLTransform("address")
and delete it with
Desired feature (not yet implemented): If the template itself is updated, then all objects affected by that template are also updated.


If an object is deleted, then all denormalized links will be updated with a special key "link_missing": True. For example, say we delete address "1234567" . Then Person will become:
    "key": "897654",
    "type": "Person",
    "name": "Fred",
        "key": "1234567",
        "link_missing": True
And if the object is recreated in the future, then that linked data will be reinstated as expected.
Similarly, if an object is saved with a link, but the linked object can't be found, "link_missing": True will be included as above.

updating denormalized linked data back to parents

The current version does not support this, but in a future version we may support the ability to change the denormalized information, and have it flow back to the original object. eg: you could change addr1 in address inside person, and it would fix the source address. Note this wont work when transforms are being used (you would need inverse transforms).

storing deltas

I've had a feature request from a friend, to have a mode that stores a version history of all changes to objects. I think it's a great idea. I'd like a strongly parsimonious feel for the library as a whole: it should just feel like "ndb with benefits").

Thursday, December 29, 2011

It's so, like, social!

Ok, so I know I'm supposed to do the second half of the post about the REST interface, but I got distracted.

You see, I'm on holiday.

On holiday, I like to do something just crazy and interesting. And not too much work ;-)

Anyway, what I've built is a very alpha-ish prototype of a concept for Social Coding.

The original idea for this came to me in September. I posted about it on Google+. Here's the content of the post:
Imagine a web app for social coding.

You get there via a link from somewhere else. At the top of the page in big typeface is the name of a function, the signature (parameters, return type) and a short description.

Then, side by side, are two columns.

On the left, there is a list of unit test implementations. Each is just a slab of code with asserts in it.

On the right there is a list of implementations.

Each unit test and each implementation can be commented on, and can be modded up or down.

Unit tests are listed in two sections; "Working" at the top and and "Broken" at the bottom. The "Broken" section lists tests that are syntactically busted.

Within the sections, the unit tests are listed in order of moderation score.

Implementations are listed in order of the number of unit tests passed. Within the same number, they are listed in moderation order.

You are free to
- Add a new unit test (possibly based on an existing test)- Add a new implementation (possibly based on an existing implementation)- Moderate other tests and implementations- Comment on anything

You can share the page on social networks and so forth with the usual buttons.

The owner of the page is the person who originally defined the function, including description. The owner can modify the definition, can mark any unit tests and implementations as not acceptable.

The function is considered implemented if there is an acceptable implementation that passes all acceptable, working unit tests. An implemented function may still be re-implemented at any time in the future, or become unimplemented by the addition of new tests which it fails.

When writing a test or an implementation, you may define new methods that don't yet exist. All user defined methods behave as links to their own page. Methods that don't have a page have one created for them when first clicked, wiki-like.

You can also define a new function directly, without actually using it in another function.

A method which relies on other methods which are not considered implemented is also not considered implemented.

A unit test which relies on methods which are not considered implemented is part of the "broken" group.

Analogs of this function definition mechanism should be created for object oriented Classes and for html Pages.

If, say, both Javascript and Python are allowed, then you could build a google AppEngine app this way.

A long chat ensued. A bit of enthusiasm flared up, and then went nowhere, because I did nothing with the idea. Yeah!

Anyway, now I have. I've built a prototype of this idea, in fact, called "Social Code". What a creative name!

It's hosted here:

The code is in github here:

Basically, it works like this:

  • Anyone can log in with a google account.
  • You can create functions. Functions are
    • A name (purely for descriptive purposes)
    • A slab of python code, hopefully containing a function implementation, but can be anything really.
    • Another slab of python code, Tests, which should be tests of the function implementation, but again can be anything. When the user hits "Save and Run!", this will be run and will be successful if no errors/exceptions are thrown, fails otherwise.
    • A list of past runs, with results & logs.
  • Functions can refer to each other
    • Say I've separately defined the function "Add"
    • If you want to call "Add" in your code, you need to include {{Add}} somewhere in your implementation code, probably in a comment. When you save it, it'll be parsed out, and appear in the "Imports" list.
    • You can call anything defined in "Add" in your code. This might include a python function called "Add", but only by convention.
For a really simple example of the way this hangs together, select the function Multiply, see what's going on, then click through to dependencies etc.

Now, anyone can change any of the tests or any of the implementations of any functions, delete any functions, add new ones. That's all so open that the whole thing's only usable as a curiosity at the moment.

In fact my current implementation is really daggy. Not a stick of javascript (or there is, there's an onclick confirm function for the delete button). So mondo page refreshes. Also no design, some css where necessary, inline (gasp!).

And OMG you'll hate editing code in textareas.

Also the "Save and Run!" button just cobbles all the code (tests + implementation + dependencys' implementations) into one big slab o' text and calls "exec" on it. Not terribly sophisticated. It'll need eventually to do stuff like run in an appengine task, probably work with code in a more sophisticated manner (should I be doing something with code objects?) etc. 

I've put the code in a serious sandbox (I think), by not allowing the __builtin__ collection to be in scope. You can't do a python import. So, you can only currently do simple things, intentionally. Opening this up in some way will be necessary, but it'll need some serious discussion, and care I think.

But it gives you something to play with, get a feel for the kind of thing I'm thinking about.

The intention is that, instead of one slab of code for "Implementation" and one for "Tests", we'll have many competing Implementations and many competing Tests. There'll be a concept of the current implementation for any function, which is one that passes all tests, which is voted up by the most users, and probably which is newest, in case of ties. That's the one that other dependent functions will import.

I think potential implementations need to be immutable. If you want to edit an implementation, you actually copy it and edit the copy. Might need a draft/published mechanism to help with this. So you see all the published implementations, and all your own drafts. Published implementations are immutable, drafts are not.

I don't know if/how this applies to tests. 

Also, I think functions should be able to define acceptance tests on other functions. If B depends on A (imports it) then B should be able to define acceptance tests on A. If A changes such that it has new implementation and tests, and passes those tests, but no longer is suitable for B, then B's acceptance tests will fail and flag that B needs fixing.

Of course the current giant-slab-o-tests approach could accommodate this; in the tests for B, just define some tests of A.

Anyway, have a play, read the old google+ post and comments, and come chat on the new google+ post where hopefully people will be yammering about this.

Sunday, December 11, 2011

Todo: Rest

First of all, by way of apology for the chirping crickets on this blog, let me say I've been otherwise occupied.

Not busy. I don't have 7 different things to get done by lunch time. I keep my schedule as simple as possible; usually it's get up, get it together, go to work, do cool stuff, come home, do family stuff, crash. Basically my policy is, if I have two things scheduled in a day, that's one too many. My brain just doesn't like that.

But occupied, yep. And that's because I changed jobs, threw out pretty much my entire existing skillset, and started from scratch.

Well, not the whole lot. It's still code. Of course. The world is basically composed of stuff that is code, and stuff that is muttering, and I try to steer clear of muttering. But still, if you're a professional senior C# dev, and have been doing windows coding for over a decade, then it turns out in retrospect to be a thing of some gravity to chuck all that in and jump into the world of LAMP, eschew that and recommend AppEngine, then go hell for leather on rich modern javascript apps on this platform, with all open source tools.

A great and terrible thing.

(and of course some obligatory PHP support is involved, which is pure masochism).

Luckily my new job is awesome, for a little company with brilliant people and the desire to build a really first class dev team, building outstanding stuff. And a healthy hatred for Microsoft (well, you know, we'll integrate with their stuff and all that, but we wouldn't use it ourselves, bleh). The very fact that I can blog about the details of what I've been building commercially, and even release some of it, should say something positive about it.

Also luckily, I'm working with a first rate front end developer, who's all about UX, who is all over javascript and jquery and underscore and backbone and etc etc etc. And that's because I'm mediocre at best at UI dev. I can never take it seriously; making computers easier for the illiterate feels somehow like enabling an alcoholic, and that's just not really in my nature. Figure it out you useless bastards, really.

Anyway, long story stupidly too long.

The crux of it is, I've been trying to learn to do AppEngine well, to do Python well, to understand the crazy (and rather spectacular) world of kludge that is modern javascript programming; more like Escher than Kafka I think, but I'm not sure. Javascript is oddly beautiful actually, and it's lovely to see functional programming burst onto the mainstream(ish) scene finally, as confused as it currently is with OO.

So my mind has been almost entirely occupied with this. I've been frenetic, I've been focused. Not in the "now I'm really paying attention" way, but more in the Vernor Vinge's "A Deepness In The Sky" way. Drooling and a bit broken, looked after by my darling wife.

So just as I'm making some breakthroughs, starting to Get It and feel a bit more comfortable, I got sick for a couple of days. Flu/Cold/Virusy thing. Bedridden, looked after by my darling wife.

All that was on my todo list was to rest.

And, I'm not making this up, what came to my mind was "Todo, rest, hey I could get that todo list sample app and make it talk to AppEngine via a RESTful interface".

 So I built a little REST library, called, while lying in my sick bed.

As I said, drooling and broken.


I've not come at Sleepy cold. Sleepy is actually a rewrite of a rewrite.

What I've been mostly doing commercially in the last few weeks has been to build a decent REST interface for AppEngine. Which of course I shouldn't be doing; just grab one that's already there. The problem is, I haven't been able to find anything serviceable. I looked, believe me, and even tried some out, but they just felt clunky; heavyweight, XML oriented (we want to use JSON), not really welcoming customization, a square peg for a round hole.

I thought we should stick with the existing options, because look how big and complex they are, we don't want to do all that work ourselves! But my good colleague protested, went away one weekend and came back with his own simple REST interface library, which was cleaner, much shorter, and really a bit of a revelation. It needed work, but that opened my eyes to his central observation, which was that this doesn't need to be that hard.

And it doesn't, of course. I mean, what is REST really?

What is REST?

Let me google that for you:

That answer sucks

Ok, then what REST means to me, in a practical sense, is an HTTP based API, using the big four requests (GET, PUT, POST, DELETE), which suits heavy javascript clients (ie: talks JSON), and which lets the client side talk to my server side datamodel in some straightforward way.

Now I know that's hopelessly non-purist, but what I'm trying to achieve is this:

- We've got a rough idea for an app to build.
- I build a server side, which is largely data model.
- Someone who is good at it builds a client side javascript app, replete with its own local data model, which must be synced somehow with the server side. That somehow is something very like backbone.js's expectation of a RESTful web api to talk to.
- We'd also like the possibility of other stuff talking to the same api (iPad apps, third party code, martian mind control beams, whatever).
- I must be able to set up a RESTful web api to let the client talk to the server. It needs to give a good balance of simplicity and customizable behaviour, be quick to put in place, and just be clean and easy to work with.

Anyway enough jibber jabber, where's some code?

Yeah yeah ok.

What I decided to do here to demonstrate my approach is to take a simple sample app that demonstrates the use of backbone.js , and make it work in the context of AppEngine. The sample app, by Jérôme Gravel-Niquet, is a todo list which lets the user make a simple but nice to use todo list, and stores it in local storage in the browser (so it really has no back end).

You can check out the sample app here:

Have a play with it here:

Starting off

So I started by downloading all the static files for the todo list app, sticking them in a basic appengine app, and popping that in git on github.

The git repo is here:

The first version that just hosts the static app, still backed by localstorage, is available at the tag v1:

You can run that in your local appengine devserver, or upload it to appengine proper, and it'll work. But it's not touching the datastore.


The app is a simple todo list. There's no concept of users, of different lists, nothing. There's just one list, which is a list of Todo items. A todo item is composed of three elements:

  • text - string, the text that the user enters describing what is to be done 
  • order - int. the numerical order in the list of this item; lower number comes first.
  • done - bool, whether the item is done or not
The first thing we need is a datastore model to describe this. This is a simple one:
from google.appengine.ext import db

class ToDo(db.Model):
    text = db.StringProperty()
    order = db.IntegerProperty()
    done = db.BooleanProperty()
    created = db.DateTimeProperty(auto_now_add = True)
    modified = db.DateTimeProperty(auto_now = True)

    def __init__(self, *args, **kwargs):
        db.Model.__init__(self, *args, **kwargs)
        if self.done is None:
            self.done = False

Just one class, ToDo. It's got text, order and done (and I've put a default value on "done" of false in the constructor). I've also added "created" and "modified" fields just for a bit of fun. These are managed by the server (using auto_now_add and auto_now), and wont be exposed to the client.

And now the REST!

Ok, now we want to expose this datamodel class through a REST api. For the ToDo class, we'll use a resource name of /todos. Locally this might be http://localhost:8080/todos. My version on appengine has the url .

The first thing to think about is the routing (kicking off the right handlers for urls) and basic infrastructure. How will we get /todos to be handled the way we want to handle it, and how would that handling be done in any case?

I figured that since the RESTful web api will be just like a normal web page, but returning JSON, why not use the standard webapp web handler mechanism already available in appengine, and the same routing mechanism? Now we'll want a little help with the routing, because we want to be able to manage /todos, but also calls like /todos/32 (where 32 is a valid id for a model instance) to address particular todo items.

So, you define the routes by providing a list of resourcename / handler pairs, which a helper function then turns into url / handler pairs as required by the wsgi application.

Here's the, showing a standard route for '/', then including the calculated restRoutes.

import webapp2

import htmlui
import restapi

# basic route for bringing up the app
lroutes = [ ('/', htmlui.ToDoHandler) ]

# add api routes, see restapi/

# create the application with these routes
app = webapp2.WSGIApplication(lroutes, debug=True)

restRoutes comes from the restapi module for this app, whose looks like this:

from todoresthandler import *
from sleepy import *

restRoutes = [
  ('todos', ToDoRestHandler)

restRoutes = Sleepy.FixRoutes(restRoutes)

ToDoRestHandler is a webapp webhandler for the rest api for ToDo. Sleepy.FixRoutes turns restRoutes from a list of (resourcename, handler) pairs to a list of (url, handler) pairs.

    def FixRoutes(cls, aRoutes, aRouteBase = None):
        Modifies routes to allow specification of id
        aRoutes should be pairs of resourcename, resource handler.
        This is modified to become pairs of route source, resource handler.
        aRouteBase is anything you want to prepend all route sources with. 
        eg: if you want all your route sources to begin with /api, 
        use aRouteBase="/api" 
        Don't include a trailing slash in aRouteBase.
        retval = []
        for lroute in aRoutes:
            lfixedRouteSource = '/(%s)(?:/(.*))?' % lroute[0]
            if aRouteBase:
                lfixedRouteSource = aRouteBase + lfixedRouteSource
            lfixedRoute = (lfixedRouteSource, lroute[1])
        return retval

We've got the /todos route wired up to ToDoRestHandler. But what's in that? Here it is:

from google.appengine.ext import webapp
from sleepy import Sleepy
from datamodel import ToDo

class ToDoRestHandler(webapp.RequestHandler):
    def get(self, aResource, aResourceArg, *args, **kwargs):
        Sleepy.GetHandler(self, aResource, aResourceArg, *args, **kwargs)

    def put(self, aResource, aResourceArg, *args, **kwargs):
        Sleepy.PutHandler(self, aResource, aResourceArg, *args, **kwargs)

    def post(self, aResource, aResourceArg, *args, **kwargs):
        Sleepy.PostHandler(self, aResource, aResourceArg, *args, **kwargs)
    def delete(self, aResource, aResourceArg, *args, **kwargs):
        Sleepy.DeleteHandler(self, aResource, aResourceArg, *args, **kwargs)
    def GetModelClass(self):
        return ToDo

What's going on here? Well, I've delegated all the functionality of GET, PUT, POST and DELETE to the Sleepy library. Plus, there's an extra method GetModelClass which tells us (actually Sleepy) which model class we're working with.

So basically you've shown us nothing. What's in this Sleepy?

No I haven't, have I? I'll get onto what's in Sleepy in the next post. For now, if you want to skip ahead, just check out the git repo. Otherwise, you can wait for the next installment, lazy person!

Sunday, October 23, 2011

The Dining Philosophers

Five silent philosophers sit at a table around a bowl of spaghetti. A fork is placed between each pair of adjacent philosophers.
Each philosopher must alternately think and eat. Eating is not limited by the amount of spaghetti left: assume an infinite supply. However, a philosopher can only eat while holding both the fork to the left and the fork to the right (an alternative problem formulation uses rice and chopsticks instead of spaghetti and forks).
Each philosopher can pick up an adjacent fork, when available, and put it down, when holding it. These are separate actions: forks must be picked up and put down one by one.
The problem is how to design a discipline of behavior (a concurrent algorithm) such that each philosopher won't starve, i.e. can forever continue to alternate between eating and thinking.

Ok, so I said in the previous post that AppEngine is a massive distributed machine. So on such a machine, we should be able to implement a solution to The Dining Philosophers Problem. How would we go about it?

Implementation of Semaphores

Firstly, we need a working Semaphore implementation. In the previous post I sketched out an approach, but since then I've built a proper working version.

Find the full Semaphore implementation here: on github

class Semaphore(polymodel.PolyModel):
    _counter = db.IntegerProperty()
    _suspendList = db.StringListProperty()

    def ConstructSemaphore(cls, aCounter):
        retval = cls()
        retval._counter = aCounter
        retval._suspendList = []
        return retval
    ConstructSemaphore = classmethod(ConstructSemaphore)
    def Wait(self, obj, *args, **kwargs):
        while True:
                lneedsRun = db.run_in_transaction(
                        obj, *args, **kwargs
                if lneedsRun:
                        obj(*args, **kwargs)
                    except Exception, ex:
            except TransactionFailedError:
                # do nothing
                logging.warning("TransactionFailedError in Wait, try again")

    def Signal(self):
        while True:
                db.run_in_transaction(_doSignal, self.key())
            except TransactionFailedError:
                # do nothing
                logging.warning("TransactionFailedError in Signal, try again")
def _doWait(aKey, aObj, *args, **kwargs):
    lneedsRun = False
    lsem = db.get(aKey)
    if not lsem:
        raise Exception("Internal: failed to retrieve semaphore in _doWait")
    if lsem._counter > 0:
        lsem._counter -= 1
        logging.debug("counter: %s" % lsem._counter)
        lneedsRun = True
        logging.debug("about to defer")
        pickled = deferred.serialize(aObj, *args, **kwargs)
        pickled = base64.encodestring(pickled)
        logging.debug("after defer, pickled=%s" % pickled)
    return lneedsRun

def _doSignal(aKey):
    lsem = db.get(aKey)
    if not lsem:
        raise Exception("Internal: failed to retrieve semaphore in _doSignal")

    if len(lsem._suspendList) > 0:
        logging.debug("about to unsuspend")
        pickled = lsem._suspendList.pop()
        pickled = base64.decodestring(pickled)
        logging.debug("pickled=%s" % pickled)

        # here I depickle the pickled call information, only to repickle inside 
        # deferred.defer. Not ideal, but cleaner given the calls available 
        # inside deferred.
            obj, args, kwds = pickle.loads(pickled)
        except Exception, e:
            raise deferred.PermanentTaskFailure(e)
        logging.debug("about to defer")
        deferred.defer(obj, _transactional=True, *args, **kwds)
        logging.debug("after defer")
        lsem._counter += 1

What I wrote last time was basically correct. I've using Nick Johnson's deferred.defer library to make the adding and running of tasks smoother (ie: hide the use of web handlers).

Some points of interest:

The _suspendList of Semaphore is a stringlist, which stores the same pickled call information that deferred.defer uses. I've made liberal use of internal functions of deferred to make this work, basically because that library is written by someone far more competent with Python than me, so why not?

Wait() takes obj, *args, **kwargs and passes them to the call of _doWait() in a transaction. obj, *args and **kwargs define the call to make once we have successfully acquired the semaphore. So, inside _doWait(), we'll either need to add obj (et al) to the _suspendList, or, if we can get the semaphore immediately, we need to call obj(). However, we can't call it inside _doWait() because it's inside a transaction - we don't want or need to be inside the context of that transaction for the call to obj().  So instead, _doWait returns a bool, which is True in the case that we've aquired the semaphore, and you'll see that Wait checks this result, and calls obj() immediately if it's True. Thus, obj() is called if necessary, but outside the transaction.

The pickled call information, created by deferred.serialize(), isn't safe to save in a stringlist (illegal characters, throws errors). So, I base64 encode it before saving it in _doWait(). You'll see in _doSignal that the pickled call information is base64 decoded.

You'll notice that inside _doSignal(), when there are waiting tasks on the _suspendList, that I dequeue one (the last one, but order is irrelevant in a Semaphore implementation), unpickle it, but I don't call it. Instead, I add a task for it using deferred.defer(). I don't call it because the task we are in has just finished doing work while holding the semaphore, which might have been lengthy. These are short lived tasks, we shouldn't do more than one user-defined thing in one task. So, instead of running this second thing (the dequeued suspended task), I reschedule it to run immediately in another task. Note also that I mark that defer as transactional; it means that if the signal transaction fails, the task wont be enqueued, which is what we want.

One last note: In case it's not obvious, the transactions combined with a reload of the Semaphore ensure that we can safely use Semaphore objects even if they are stale. So don't worry about stale Semaphores causing contention issues. This is explained more in the previous post.

Oh, and I think if you pass a queue name into Wait() in the same way you would to a regular call to deferred.defer() (ie: a parameter _queue="somequeuename"), the semaphore will use that queue instead of default, which might be handy.

Testing the Semaphores

I've got some simple test code in for giving Semaphores a run.

def SemaphoreTest1():"*****************************")"**   BEGIN SEMAPHORETEST1  **")"*****************************")
    lsem = Semaphore.ConstructSemaphore(2)
    lcount = 0
    while lcount < 20:
        deferred.defer(SemaphoreTest1EntryPoint, lsem.key(), lcount, True)
        lcount += 1
def SemaphoreTest1EntryPoint(aKey, aNum, aFirst):
    lsem = db.get(aKey)
    if not lsem:
        raise Exception("Failed to retrieve semaphore in EntryPoint1")

    if aFirst:
        # this is before we've got the semaphore"Before Wait for %s" % aNum)
        lsem.Wait(SemaphoreTest1EntryPoint, aKey, aNum, False)
        # we now have the semaphore"Begin Critsec for %s" % aNum)
        sleep(2) # stay inside critsec for 2 seconds"End Critsec for %s" % aNum)
        lsem.Signal() ("After Signal for %s" % aNum)

SemaphoreTest1() creates a new Semaphore with counter=2 (ie: allow max 2 tasks to be inside the Semaphore at any time), and schedules 20 tasks to run immediately, which all run SemaphoreTestEntryPoint() with the new Semaphore (passed by key in the aKey parameter).

SemaphoreTestEntryPoint() loads the Semaphore, then takes one of two paths. The first time through aFirst is True (we don't hold the semaphore yet); it waits on the semaphore (switching aFirst to False). The second time through, with aFirst as False, where we hold the Semaphore, it sleeps for a couple of seconds, then signals the Semaphore and exits.

The upshot of this is that the 20 tasks, all run at the same time, will contend for the Semaphore. The first two to get it will sit inside it for 2 seconds (a long time, while the other tasks keep waiting on it and suspending). Eventually these tasks holding it will signal it and exit. Each time a task signals, it'll kick off a waiting one, which in turn will get it, again taking two seconds, then exit. And do on until there are no more tasks left.

Try running this code, and fiddling with the parameters a bit. Note that you'll find the whole project on GitHub, here.

Back to Dinner

So we've got a working Semaphore. So how do we implement a solution to the dining philosophers?

Let's implement the classic deadlock algorithm first. It goes like this:

  • think until the left fork is available; when it is, pick it up;
  • think until the right fork is available; when it is, pick it up
  • eat
  • put the left fork down
  • put the right fork down
  • repeat from the start
First we need to represent a Fork. I'll use a Semaphore with counter=1, ie: while someone holds the fork, no one else may hold the fork:

class Fork(Semaphore.Semaphore):
    def ConstructFork(cls):
        lfork = cls.ConstructSemaphore(1)
        return lfork
    ConstructFork = classmethod(ConstructFork)

I used a polymodel for Semaphore, so we can override it as above.

Next is an implementation of the faulty algorithm above. ThinkAndEatByKey is a wrapper of ThinkAndEat, which takes Forks by key rather than by object reference, loads them, and calls through. The real work happens in ThinkAndEat.

def ThinkAndEatByKey(aFirstForkKey, aSecondForkKey, aIndex, aNumLoops, aHasFirst, aHasSecond):
    lFirstFork = db.get(aFirstForkKey)
    if not lFirstFork:
        raise Exception("Failed to retrieve Left Fork")
    lSecondFork = db.get(aSecondForkKey)
    if not lSecondFork:
        raise Exception("Failed to retrieve Right Fork")
    ThinkAndEat(lFirstFork, lSecondFork, aIndex, aNumLoops, aHasFirst, aHasSecond)
def ThinkAndEat(aFirstFork, aSecondFork, aIndex, aNumLoops, aHasFirst=False, aHasSecond=False):
    if not aHasFirst:
        # this is before we've got the semaphore"Wait on first for %s" % aIndex)
        aFirstFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, False)
    elif not aHasSecond:
        sleep(10) # takes a while to pick up the second fork!"Wait on second for %s" % aIndex)
        aSecondFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, True)
    else:"EAT for %s" % aIndex)"Dropping second fork for %s" % aIndex)
        aSecondFork.Signal()"Dropping first fork for %s" % aIndex)
        if aNumLoops == 1:
  "Finished looping, done.")
  "Ready to think again, deferring")

ThinkAndEat has three states. First, if we have neither Semaphore then aHasFirst and aHasSecond are false (I use First and Second instead of Left and Right for a bit of leeway later on). In this case, we wait on the first fork, and aHasFirst, aHasSecond will be True/False on the next call. This is the next case, where we then wait on the the second fork, and aHasFirst, aHasSecond will both be True on the third call. Finally, when they are both true, we have both forks. So we Eat (just log something, but this could be a lengthy op of some kind), then Signal, ie: drop, both forks.

Finally, we reschedule ThinkAndEat again to complete the loop.

You'll note in the second state I've added a sleep for 10 seconds. That is, between picking up the first fork and picking up the second, our philosophers think for a really long time. This doesn't change the theoretical behaviour of the algorithm, but in practice it makes it very easy to deadlock especially on the first iteration; everyone will pick up the first fork, there'll be a pause, then everyone will try to pick up the second fork and have to wait indefinitely.

Note that I use a countdown, aNumLoops, to stop this running forever. Eventually, in my house, even the Philosophers need to finish and go home!

Now, to finish implementing the algorithm above, we need to create all the forks, then call ThinkAndEat for each philosopher, passing in the correct forks.

def DiningPhilosphersFailTest():
    lnumPhilosophers = 5
    lnumLoops = 5 # number of think/eat loops
    leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20)
    lforks = []
    lforkIndex = 0
    while lforkIndex < lnumPhilosophers:
        lfork = Fork.ConstructFork()
        lforkIndex += 1
    lphilosopherIndex = 0
    while lphilosopherIndex < lnumPhilosophers:
            lforks[(lphilosopherIndex+1) % lnumPhilosophers],
            _eta = leta
        lphilosopherIndex += 1

This method sets up 5 philosophers with 5 forks, who will perform the ThinkAndEat loop 5 times. Philosopher i (i from 0 to 4) gets left fork i, right fork i+1, except for the philosopher 4, who gets left fork 4, right fork 0 (ie: a round table).

When you run this method, it tends to deadlock every time. You can watch the default task list until the queued tasks goes to zero, then go to the datastore and run this query:

select * from Fork where _counter = 0

You should get a bunch of results; each one is a Fork (ie: Semaphore) which has had a successful Wait(), but no comparable Signal(). For this to be the case when no tasks are running requires that a set of philosophers (all of them in fact) have one fork and are waiting for another. Deadlock.

Now to fix this, Dijkstra (who originally posed this problem) proposed a strict ordering of resources (forks) along with a rule that philosophers acquire forks only in resource order. So, if we use the fork numbering above, and say we must acquire lower numbered forks before higher numbered, then we have the same algorithm *except* that philosopher 4 must now change to acquire his right fork first (fork 0) followed by his left fork (fork 4).

We can achieve this simply by swapping forks, passing fork 0 as FirstFork and fork 4 as SecondFork. This is of course why I used First and Second rather than Left and Right.

So here's the non-deadlocking solution:

def DiningPhilosphersSucceedTest():
    lnumPhilosophers = 5
    lnumLoops = 5 # number of think/eat loops
    leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20)
    lforks = []
    lforkIndex = 0
    while lforkIndex < lnumPhilosophers:
        lfork = Fork.ConstructFork()
        lforkIndex += 1
    lphilosopherIndex = 0
    while lphilosopherIndex < lnumPhilosophers:
        if lphilosopherIndex < lnumPhilosophers-1:
            # not the last one
                _eta = leta
            # the last one
                _eta = leta
        lphilosopherIndex += 1

If you run this you wont get deadlock. You should find that when all tasks complete (the task queue is empty), "select * from Fork where _counter = 0" will give you zero objects.

You can safely run either of these solutions with fewer or much larger numbers of Philosophers, and more or less loops. To get deadlock with the first solution, you'll need to ensure tasks can run together. I find this takes two things; you need a large bucket size and replenishment rate on your default task queue, and you need to give it a decent number of loops, to give AppEngine time to spin up enough instances to run a lot of parallel tasks. To deadlock, you'll need to be able to run all Philosopher tasks concurrently; if they run sequentially they'll resolve out and fail to deadlock. Remember that the deadlocking algorithm isn't guaranteed to deadlock, that's only a possibility. The more philosophers, the harder it'll be to see it in practice (although you should see a lot of contention in the logs).

One more note: You can't see this deadlock with the development appserver. Why not? Because the development server runs tasks sequentially. The Semaphores and Tasks will work, but in a very uninteresting one-task-at-a-time way.

Lifetime issues with Semaphores

You'll notice that all these tests leave Semaphore objects lying around afterwards.

Classically, semaphores are in-memory structures, and require memory management techniques for lifetime management. We have an analogous cleanup requirement for these Datastore backed Semaphores.

You'll need some way to know when you are finished with them, so you can then delete them. For some uses it may be clear (eg: if they are always associated with another resource, then you create them when that resource is created and delete them when that resource is deleted). Other times, you may know that they should only live a certain length of time, so for example you could add a time-to-live or a delete-me-after field to them, and have a background process cleaning up every so often.

Very interesting, but why do I care? 

AppEngine lends itself to all kinds of uses where we are managing access to resources. Either we are managing something limited (with a semaphore with a counter equal to the number of resources available), or we are managing contention over something which can only be touched by one task (with a semaphore with counter=1, otherwise known as a Mutex).

In my app Syyncc, I used a dumb cron loop to move changes from one social network to another. Because changes move in multiple directions, and the central mechanism isn't concurrency safe, I needed to use a single processing loop to ensure I was only working on one thing at a time. It was a brain dead approach to controlling a concurrency issue.

But it doesn't scale. It'll process 50 items at a time, once every 2 minutes. That's severely limited.

Instead, I intend to model each user's combination of connected networks (fb, G+, etc) as a "hub", which is protected by a Mutex (Semaphore with Count=1). I can have separate tasks (Workers) monitoring each network, and driving changes to the others, which don't communicate except that they share the hub Mutex, so only one can run at once. So, if there are changes in a user's G+ and Facebook at the same time, processing those changes will be concurrency safe, with the critical shared pieces protected.

Why not just use transactions?

Because transactions only protect the datastore, and enqueuing of tasks.

Resources under contention can be all kinds of things. In the case of Syyncc, they involve reading and writing the datastore *and* reading and writing social networks. A model of managing contention is better here than transactions (simply because the latter aren't available!)

And, I want to make a State Machine

A project I've got coming up involves starting and stopping virtual machines in AWS. To do that well turns out to be involved, and really I want a full State Machine, where I can define a state graph, with nodes, conditions, transitions, ooh. And to build that, I'll need Semaphores.

Expect to see an article on implementation of a Semaphore-based State Machine in the near future.

Sunday, October 16, 2011

Not Your Mama's Web Server

It's a distributed machine, turkey. 

AppEngine is a funny kind of beast. It's packaged up and presented more or less as a weird kind of futuristic virtual web host; sort of like a normal web host, but the plumbing's taken care of for you. Higher level than normal web hosts, kind of annoyingly proprietary, and it works sort of weirdly when you scratch the surface; the datastore isn't quite like a sql database, the web server's not quite like a proper web server, things are just a little queer.

They're queer because it's not actually a webserver. Or, well, it is a webserver, but that's like calling a Tesla Roadster a horseless carriage. And then complaining that the reins are weird. (And making the SQL service is kind of like putting the reins in.)

What it really is, is a new kind of computer (and by new I mean 40 year old computer science, but that's in no way a criticism). A distributed one. It's made out of lots of little physical ones, but they're mostly abstracted away. What we users get to touch is a high level machine where we can run some code in some well defined and interesting ways, and where that code can talk to some very interesting global resources (memcache, datastore, the webby bits of the internet, etc).

What's really interesting to me is what is always interesting about distributed systems; massive concurrency. *Massive* concurrency. Concurrency on a scale where there aren't really specific limitations on how much stuff we can throw in in parallel; we seem to be limited largely by our wallets. And if you're a bit careful and pay a bit of attention, it's pretty cheap and that limitation is pretty minor. So that's fun.

But this concurrency isn't the within-one-process, multi-threaded computing type of concurrency. This is old school, it feels a lot more like multi-processing, processes running in their own address spaces, maybe on separate physical hardware, at a great distance from one another. By distance here I mean that the inter-process communication mechanisms are many orders of magnitude slower than the code. These are not threads, that behave like people hanging out, chatting with one another. These are more like characters in a Dickens novel, on separate continents, sending letters by sea. Charming and old world, if a little inconvenient. Again, not a criticism, this is how distributed computing is.

(Incidentally, I know that you can do multi-threaded programming on AppEngine, but I just think that's a bit boring and misses the point of the platform. Threads live inside one request, ie: process, and only live as long as that process lives. You can do some optimisation stuff with them, but they are at the wrong level of abstraction to be the real game here.)

So what are the elements of this distributed machine?

Looking at AppEngine as a Distributed Machine

Well, each user account has a bunch of program slots (10, or as many as you like for people with big wallets). Each slot, each "app", is the single-distributed-machine abstraction level. The slot/app/machine has its own resources, and lets you run one program.

Each program has a bunch of potential entry points (web request handlers). They can be used to kick off copies of the program. That can happen because of user requests, or due to scheduled task calls, or cron jobs, or backend jobs (other?). In practice, the program, having multiple entry points, is functionally a set of cooperating programs, more or less divided up by entry point (ie: web request handler).

Each time (a piece of) the program is kicked off, it gets its own address space. (Well, if you mark your programs as "threadsafe", then it might share address space with other copies of itself, but that's again an optimisation that can be ignored here.) But in any case, it's a program, loaded into an address space and being executed. We might call it a "process".

The process can do things. One special purpose thing it might do, in response to a user request, is to construct and return a web page. But it can do all kinds of other things.

Simplest among them is do to some calculation of something. Maybe you want to figure out a metric tonne of prime numbers? That sort of thing. In this case, you just do your calc and finish. Only thing to consider here is that there can be a limitation on the lifespan of a process (ie: 60 seconds, if you're using a frontend).

Anything more complex than simple calculations will need to talk to things outside of the process. Even simple calcs presumably need to go somewhere when they are complete, so you can read the results. So we need interprocess communications mechanisms. What's available?

The most obvious IPC mechanism is the datastore. We can put to the datastore, and get from the datastore. The datastore is persistent, so processes can talk to each other across time and space. The datastore is pretty slow compared to intra-process work (sea mail), although we can ameliorate that a little with memcache.

Interestingly, we probably can't reliably use memcache for this on its own, because it can expire without warning. Actually, hold that thought, there might be interprocess comms jobs to which it is suited, I'll keep an eye out for them.

The channels API could also potentially be used for interprocess communication. It's supposed to be for pushing event information to web clients, but a client could just as easily be a process inside this virtual machine. Then the channels would look like some pretty sweet message queues. So, keep that in mind too.

But the datastore is the bread and butter here.

Now there are roughly two ways of thinking about processes in this machine, two models; roughly, they are Long Lived and Short Lived.

In the Long Lived model, processes live indefinitely, so they need to communicate directly with each other. This is where the channels API could be interesting, acting like one-way pipes between unix processes. Old school message passing. They might also communicate by polling datastore objects, waiting for things to happen. This will require using backends to accomplish, so is probably a bit pricey. It also feels a little clunky; lots of sitting in loops monitoring things.

The Short Lived model is a bit more interesting, mainly because it feels more like the way the platform, the virtual distributed machine, wants to behave. In this model, the processes are short lived, and crucially never talk directly to each other. Instead, they all interact with the datastore. They are kicked off from somewhere (usually as Tasks, I'll get to that), and have arguments which, along with the entry point used, define what they are supposed to do. They read from the datastore to figure out more context, access shared resources, whatever. They perform calculations. They write back to the datastore. And they might queue up more processes (tasks) to run, as appropriate. So the processes behave as individual workers chipping away at the datastore, a shared medium, which is the coordinating point for all their work. Rather than talking directly to each other, the processes lay down cues, and notice cues layed down by others, doing their work based on those cues. It's a bit stigmergic.

Using short lived processes means we can be event driven. Programs can be designed to only be doing work when there's something which needs doing; no processes are running if there is no work to do. Combining lots of event driven pieces together mean that, at a larger scale, we can approximate the process model of traditional computers, performing work when needed and blocking when waiting for results from external sources.

To accomplish this, we first need an implementation of invokable, short lived processes. AppEngine Tasks fit this requirement. If we schedule a task to run now, we can give it arguments/context, it can run when there are resources available, it can do a short job and finish, possibly spawning more tasks. If you squint a little, this is making AppEngine's task scheduler look a little like an operating system's process scheduler.

If we use multiple tasks in concert with one another, we can emulate blocking and waiting. If a task gets to a point where it needs to wait indefinitely on an external event, we can let it quite, and arrange it so that another task can pick up where it left off when the event occurs. So our Task/Process is a bit more limited than a normal machine process, but in aggregate tasks are as powerful as we need them to be.

So in this context, we've got Processes which can do work, and which can potentially block & wait. The next thing we need is some communication primitives to let them coordinate effectively.


Ok, now I make an admission: I still own books. Not for any real information value, mind you, but for sentimental reasons. I don't own many, but I have a little bookshelf in my study, and some of my textbooks from my computer science degree (long ago and far away) sit yellowing, waiting to be called back in to service. This is as good a day as any.

Flicking through them, I've found "Principles of Concurrent and Distributed Programming", by Ben-Ari (edited by no less than C.A.R. Hoare). Flip, flip, ah here we are, Semaphores.

So can we implement a Semaphore appropriate to AppEngine?

If we model the Semaphore as a datastore object, then it needs two methods, Wait and Signal. It also needs an internal integer count (the actual semaphore value), and a queue of waiting processes.

First of all, Wait and Signal need to be atomic. How do we do that in the context of Datastore? With a transaction.

Code inside a normal transaction can work with objects in one entity group. In this case we're inside the limitations, because we only need to work with the object itself in order to implement atomicity of Wait and Signal.

The one difference between a transaction and a traditional critical section (as required by Wait and Signal) is that the transaction can fail. The simplest way to make a transaction into a critical section is to put it inside a loop. Python-like pseudocode:

  while true:
          perform transaction
      catch transaction-fails:
          # do nothing, we'll head around the loop again

  #here we succeeded

Now that could potentially starve. If this is in a front-end task, it'll time out eventually. We'll have to deal with timing out anyway, so let's leave it for that. If this is in a backend, under extreme contention it could just fail repeatedly forever. OTOH if we get into that situation, we're probably doing something spectacularly wrongly.

Now the next difficult thing is that we don't have a way to suspend a task and wake it again, continuing where it left off, when done. But, we could make something a bit crappier that'd do the job.

We could say that if you have a process that wants to wait on a semaphore, then it needs to be broken into two pieces. The first piece works up to where it waits on the semaphore. The second piece is run when it gets through wait, and must signal the semaphore at some point.

So say this is our desired process code:

  semaphore = GetTheSemaphoreFromSomewhere # I'll get to this later

Then that needs to be split to become

  semaphore = GetTheSemaphoreFromSomewhere
  semaphore.wait(EntryPoint2) <- we tell the semaphore where we need to go next

  semaphore = GetTheSemaphoreFromSomewhere

Ok. Then how do we implement the semaphore? Something like this. Pythonish pseudocode again.

class Semaphore(datastore object):
  _counter = int property
  _suspendList = entry point list property

  def Wait(self, NextEntryPoint)
    while true:
          db.run_in_transaction(_doWait, self, NextEntryPoint)
      catch transaction-fails:
          # do nothing

  def Signal(self)
    while true:
          db.run_in_transaction(_doSignal, self)
      catch transaction-fails:
          # do nothing

def _doWait(key, NextEntryPoint)
  self = db.get(key)
  if self._counter > 0:
    self._counter -= 1

def _doSignal(key)
  self = db.get_by_key(key)
  if len(self._suspendList) > 0:
    NextEntryPoint = self._suspendList.remove()
    self._counter += 1

That looks easy, doesn't it?

I haven't explained what entry points are; they could just be the relative url for a request handler. Then, the entry point list property could just be a string list property, and call(NextEntryPoint) could simply do a task.add() for that entry point.

Alternatively, entry points could be pickled function objects, kind of like what's used in deferred.defer. Actually that could be pretty sweet!

Also, where did we get the semaphore? Well, we probably want to be able to create them by name, and retrieve them by name. We could use that name as the entity key, or part of the entity key. Then you create a semaphore somewhere, and later retrieve it by looking it up by key (nice and quick).


oh man I have to stop.

In the next post, I'll include a proper semaphore implementation in python. 

I'd better get it done soon though; I'm having some philosophers to dinner.