http://en.wikipedia.org/wiki/Dining_philosophers_problem
Five silent philosophers sit at a table around a bowl of spaghetti. A fork is placed between each pair of adjacent philosophers.
Each philosopher must alternately think and eat. Eating is not limited by the amount of spaghetti left: assume an infinite supply. However, a philosopher can only eat while holding both the fork to the left and the fork to the right (an alternative problem formulation uses rice and chopsticks instead of spaghetti and forks).
Each philosopher can pick up an adjacent fork, when available, and put it down, when holding it. These are separate actions: forks must be picked up and put down one by one.
The problem is how to design a discipline of behavior (a concurrent algorithm) such that each philosopher won't starve, i.e. can forever continue to alternate between eating and thinking.
Ok, so I said in the previous post that AppEngine is a massive distributed machine. So on such a machine, we should be able to implement a solution to The Dining Philosophers Problem. How would we go about it?
Implementation of Semaphores
Firstly, we need a working Semaphore implementation. In the previous post I sketched out an approach, but since then I've built a proper working version.
Find the full Semaphore implementation here: Semaphore.py on github
class Semaphore(polymodel.PolyModel): _counter = db.IntegerProperty() _suspendList = db.StringListProperty() def ConstructSemaphore(cls, aCounter): retval = cls() retval._counter = aCounter retval._suspendList = [] return retval ConstructSemaphore = classmethod(ConstructSemaphore) def Wait(self, obj, *args, **kwargs): while True: try: lneedsRun = db.run_in_transaction( _doWait, self.key(), obj, *args, **kwargs ) if lneedsRun: try: obj(*args, **kwargs) except Exception, ex: logging.error(ex) break except TransactionFailedError: # do nothing logging.warning("TransactionFailedError in Wait, try again") def Signal(self): while True: try: db.run_in_transaction(_doSignal, self.key()) break except TransactionFailedError: # do nothing logging.warning("TransactionFailedError in Signal, try again") def _doWait(aKey, aObj, *args, **kwargs): lneedsRun = False lsem = db.get(aKey) if not lsem: raise Exception("Internal: failed to retrieve semaphore in _doWait") if lsem._counter > 0: lsem._counter -= 1 logging.debug("counter: %s" % lsem._counter) lneedsRun = True else: logging.debug("about to defer") pickled = deferred.serialize(aObj, *args, **kwargs) pickled = base64.encodestring(pickled) logging.debug("after defer, pickled=%s" % pickled) lsem._suspendList.append(pickled) lsem.put() return lneedsRun def _doSignal(aKey): lsem = db.get(aKey) if not lsem: raise Exception("Internal: failed to retrieve semaphore in _doSignal") if len(lsem._suspendList) > 0: logging.debug("about to unsuspend") pickled = lsem._suspendList.pop() pickled = base64.decodestring(pickled) logging.debug("pickled=%s" % pickled) # here I depickle the pickled call information, only to repickle inside # deferred.defer. Not ideal, but cleaner given the calls available # inside deferred. try: obj, args, kwds = pickle.loads(pickled) except Exception, e: raise deferred.PermanentTaskFailure(e) logging.debug("about to defer") deferred.defer(obj, _transactional=True, *args, **kwds) #deferred.run(pickled) logging.debug("after defer") else: lsem._counter += 1 lsem.put()
What I wrote last time was basically correct. I've using Nick Johnson's deferred.defer library to make the adding and running of tasks smoother (ie: hide the use of web handlers).
Some points of interest:
The _suspendList of Semaphore is a stringlist, which stores the same pickled call information that deferred.defer uses. I've made liberal use of internal functions of deferred to make this work, basically because that library is written by someone far more competent with Python than me, so why not?
The _suspendList of Semaphore is a stringlist, which stores the same pickled call information that deferred.defer uses. I've made liberal use of internal functions of deferred to make this work, basically because that library is written by someone far more competent with Python than me, so why not?
Wait() takes obj, *args, **kwargs and passes them to the call of _doWait() in a transaction. obj, *args and **kwargs define the call to make once we have successfully acquired the semaphore. So, inside _doWait(), we'll either need to add obj (et al) to the _suspendList, or, if we can get the semaphore immediately, we need to call obj(). However, we can't call it inside _doWait() because it's inside a transaction - we don't want or need to be inside the context of that transaction for the call to obj(). So instead, _doWait returns a bool, which is True in the case that we've aquired the semaphore, and you'll see that Wait checks this result, and calls obj() immediately if it's True. Thus, obj() is called if necessary, but outside the transaction.
The pickled call information, created by deferred.serialize(), isn't safe to save in a stringlist (illegal characters, throws errors). So, I base64 encode it before saving it in _doWait(). You'll see in _doSignal that the pickled call information is base64 decoded.
You'll notice that inside _doSignal(), when there are waiting tasks on the _suspendList, that I dequeue one (the last one, but order is irrelevant in a Semaphore implementation), unpickle it, but I don't call it. Instead, I add a task for it using deferred.defer(). I don't call it because the task we are in has just finished doing work while holding the semaphore, which might have been lengthy. These are short lived tasks, we shouldn't do more than one user-defined thing in one task. So, instead of running this second thing (the dequeued suspended task), I reschedule it to run immediately in another task. Note also that I mark that defer as transactional; it means that if the signal transaction fails, the task wont be enqueued, which is what we want.
One last note: In case it's not obvious, the transactions combined with a reload of the Semaphore ensure that we can safely use Semaphore objects even if they are stale. So don't worry about stale Semaphores causing contention issues. This is explained more in the previous post.
Oh, and I think if you pass a queue name into Wait() in the same way you would to a regular call to deferred.defer() (ie: a parameter _queue="somequeuename"), the semaphore will use that queue instead of default, which might be handy.
Testing the Semaphores
I've got some simple test code in Semaphore.py for giving Semaphores a run.
def SemaphoreTest1():
The pickled call information, created by deferred.serialize(), isn't safe to save in a stringlist (illegal characters, throws errors). So, I base64 encode it before saving it in _doWait(). You'll see in _doSignal that the pickled call information is base64 decoded.
You'll notice that inside _doSignal(), when there are waiting tasks on the _suspendList, that I dequeue one (the last one, but order is irrelevant in a Semaphore implementation), unpickle it, but I don't call it. Instead, I add a task for it using deferred.defer(). I don't call it because the task we are in has just finished doing work while holding the semaphore, which might have been lengthy. These are short lived tasks, we shouldn't do more than one user-defined thing in one task. So, instead of running this second thing (the dequeued suspended task), I reschedule it to run immediately in another task. Note also that I mark that defer as transactional; it means that if the signal transaction fails, the task wont be enqueued, which is what we want.
One last note: In case it's not obvious, the transactions combined with a reload of the Semaphore ensure that we can safely use Semaphore objects even if they are stale. So don't worry about stale Semaphores causing contention issues. This is explained more in the previous post.
Oh, and I think if you pass a queue name into Wait() in the same way you would to a regular call to deferred.defer() (ie: a parameter _queue="somequeuename"), the semaphore will use that queue instead of default, which might be handy.
Testing the Semaphores
I've got some simple test code in Semaphore.py for giving Semaphores a run.
def SemaphoreTest1():
logging.info("*****************************") logging.info("** BEGIN SEMAPHORETEST1 **") logging.info("*****************************") lsem = Semaphore.ConstructSemaphore(2) lsem.put() lcount = 0 while lcount < 20: deferred.defer(SemaphoreTest1EntryPoint, lsem.key(), lcount, True) lcount += 1 def SemaphoreTest1EntryPoint(aKey, aNum, aFirst): lsem = db.get(aKey) if not lsem: raise Exception("Failed to retrieve semaphore in EntryPoint1") if aFirst: # this is before we've got the semaphore logging.info("Before Wait for %s" % aNum) lsem.Wait(SemaphoreTest1EntryPoint, aKey, aNum, False) else: # we now have the semaphore logging.info("Begin Critsec for %s" % aNum) sleep(2) # stay inside critsec for 2 seconds logging.info("End Critsec for %s" % aNum) lsem.Signal() logging.info ("After Signal for %s" % aNum)
SemaphoreTest1() creates a new Semaphore with counter=2 (ie: allow max 2 tasks to be inside the Semaphore at any time), and schedules 20 tasks to run immediately, which all run SemaphoreTestEntryPoint() with the new Semaphore (passed by key in the aKey parameter).
SemaphoreTestEntryPoint() loads the Semaphore, then takes one of two paths. The first time through aFirst is True (we don't hold the semaphore yet); it waits on the semaphore (switching aFirst to False). The second time through, with aFirst as False, where we hold the Semaphore, it sleeps for a couple of seconds, then signals the Semaphore and exits.
The upshot of this is that the 20 tasks, all run at the same time, will contend for the Semaphore. The first two to get it will sit inside it for 2 seconds (a long time, while the other tasks keep waiting on it and suspending). Eventually these tasks holding it will signal it and exit. Each time a task signals, it'll kick off a waiting one, which in turn will get it, again taking two seconds, then exit. And do on until there are no more tasks left.
Try running this code, and fiddling with the parameters a bit. Note that you'll find the whole project on GitHub, here.
Back to Dinner
So we've got a working Semaphore. So how do we implement a solution to the dining philosophers?
Let's implement the classic deadlock algorithm first. It goes like this:
- think until the left fork is available; when it is, pick it up;
- think until the right fork is available; when it is, pick it up
- eat
- put the left fork down
- put the right fork down
- repeat from the start
class Fork(Semaphore.Semaphore): def ConstructFork(cls): lfork = cls.ConstructSemaphore(1) return lfork ConstructFork = classmethod(ConstructFork)
I used a polymodel for Semaphore, so we can override it as above.
Next is an implementation of the faulty algorithm above. ThinkAndEatByKey is a wrapper of ThinkAndEat, which takes Forks by key rather than by object reference, loads them, and calls through. The real work happens in ThinkAndEat.
def ThinkAndEatByKey(aFirstForkKey, aSecondForkKey, aIndex, aNumLoops, aHasFirst, aHasSecond): lFirstFork = db.get(aFirstForkKey) if not lFirstFork: raise Exception("Failed to retrieve Left Fork") lSecondFork = db.get(aSecondForkKey) if not lSecondFork: raise Exception("Failed to retrieve Right Fork") ThinkAndEat(lFirstFork, lSecondFork, aIndex, aNumLoops, aHasFirst, aHasSecond) def ThinkAndEat(aFirstFork, aSecondFork, aIndex, aNumLoops, aHasFirst=False, aHasSecond=False): if not aHasFirst: # this is before we've got the semaphore logging.info("Wait on first for %s" % aIndex) aFirstFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, False) elif not aHasSecond: sleep(10) # takes a while to pick up the second fork! logging.info("Wait on second for %s" % aIndex) aSecondFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, True) else: logging.info("EAT for %s" % aIndex) logging.info("Dropping second fork for %s" % aIndex) aSecondFork.Signal() logging.info("Dropping first fork for %s" % aIndex) aFirstFork.Signal() if aNumLoops == 1: logging.info("Finished looping, done.") else: logging.info("Ready to think again, deferring") deferred.defer( ThinkAndEat, aFirstFork, aSecondFork, aIndex, aNumLoops-1 )
ThinkAndEat has three states. First, if we have neither Semaphore then aHasFirst and aHasSecond are false (I use First and Second instead of Left and Right for a bit of leeway later on). In this case, we wait on the first fork, and aHasFirst, aHasSecond will be True/False on the next call. This is the next case, where we then wait on the the second fork, and aHasFirst, aHasSecond will both be True on the third call. Finally, when they are both true, we have both forks. So we Eat (just log something, but this could be a lengthy op of some kind), then Signal, ie: drop, both forks.
Finally, we reschedule ThinkAndEat again to complete the loop.
You'll note in the second state I've added a sleep for 10 seconds. That is, between picking up the first fork and picking up the second, our philosophers think for a really long time. This doesn't change the theoretical behaviour of the algorithm, but in practice it makes it very easy to deadlock especially on the first iteration; everyone will pick up the first fork, there'll be a pause, then everyone will try to pick up the second fork and have to wait indefinitely.
Note that I use a countdown, aNumLoops, to stop this running forever. Eventually, in my house, even the Philosophers need to finish and go home!
Now, to finish implementing the algorithm above, we need to create all the forks, then call ThinkAndEat for each philosopher, passing in the correct forks.
def DiningPhilosphersFailTest(): lnumPhilosophers = 5 lnumLoops = 5 # number of think/eat loops
leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20) lforks = [] lforkIndex = 0 while lforkIndex < lnumPhilosophers: lfork = Fork.ConstructFork() lfork.put() lforks.append(lfork) lforkIndex += 1 lphilosopherIndex = 0 while lphilosopherIndex < lnumPhilosophers: deferred.defer( ThinkAndEat, lforks[lphilosopherIndex], lforks[(lphilosopherIndex+1) % lnumPhilosophers], lphilosopherIndex, lnumLoops, _eta = leta ) lphilosopherIndex += 1
This method sets up 5 philosophers with 5 forks, who will perform the ThinkAndEat loop 5 times. Philosopher i (i from 0 to 4) gets left fork i, right fork i+1, except for the philosopher 4, who gets left fork 4, right fork 0 (ie: a round table).
When you run this method, it tends to deadlock every time. You can watch the default task list until the queued tasks goes to zero, then go to the datastore and run this query:
select * from Fork where _counter = 0
You should get a bunch of results; each one is a Fork (ie: Semaphore) which has had a successful Wait(), but no comparable Signal(). For this to be the case when no tasks are running requires that a set of philosophers (all of them in fact) have one fork and are waiting for another. Deadlock.
Now to fix this, Dijkstra (who originally posed this problem) proposed a strict ordering of resources (forks) along with a rule that philosophers acquire forks only in resource order. So, if we use the fork numbering above, and say we must acquire lower numbered forks before higher numbered, then we have the same algorithm *except* that philosopher 4 must now change to acquire his right fork first (fork 0) followed by his left fork (fork 4).
We can achieve this simply by swapping forks, passing fork 0 as FirstFork and fork 4 as SecondFork. This is of course why I used First and Second rather than Left and Right.
So here's the non-deadlocking solution:
def DiningPhilosphersSucceedTest(): lnumPhilosophers = 5 lnumLoops = 5 # number of think/eat loops leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20) lforks = [] lforkIndex = 0 while lforkIndex < lnumPhilosophers: lfork = Fork.ConstructFork() lfork.put() lforks.append(lfork) lforkIndex += 1 lphilosopherIndex = 0 while lphilosopherIndex < lnumPhilosophers: if lphilosopherIndex < lnumPhilosophers-1: # not the last one deferred.defer( ThinkAndEat, lforks[lphilosopherIndex], lforks[lphilosopherIndex+1], lphilosopherIndex, lnumLoops, _eta = leta ) else: # the last one deferred.defer( ThinkAndEat, lforks[0], lforks[lphilosopherIndex], lphilosopherIndex, lnumLoops, _eta = leta ); lphilosopherIndex += 1
If you run this you wont get deadlock. You should find that when all tasks complete (the task queue is empty), "select * from Fork where _counter = 0" will give you zero objects.
One more note: You can't see this deadlock with the development appserver. Why not? Because the development server runs tasks sequentially. The Semaphores and Tasks will work, but in a very uninteresting one-task-at-a-time way.
Lifetime issues with Semaphores
You'll notice that all these tests leave Semaphore objects lying around afterwards.
Classically, semaphores are in-memory structures, and require memory management techniques for lifetime management. We have an analogous cleanup requirement for these Datastore backed Semaphores.
You'll need some way to know when you are finished with them, so you can then delete them. For some uses it may be clear (eg: if they are always associated with another resource, then you create them when that resource is created and delete them when that resource is deleted). Other times, you may know that they should only live a certain length of time, so for example you could add a time-to-live or a delete-me-after field to them, and have a background process cleaning up every so often.
Very interesting, but why do I care?
AppEngine lends itself to all kinds of uses where we are managing access to resources. Either we are managing something limited (with a semaphore with a counter equal to the number of resources available), or we are managing contention over something which can only be touched by one task (with a semaphore with counter=1, otherwise known as a Mutex).
In my app Syyncc, I used a dumb cron loop to move changes from one social network to another. Because changes move in multiple directions, and the central mechanism isn't concurrency safe, I needed to use a single processing loop to ensure I was only working on one thing at a time. It was a brain dead approach to controlling a concurrency issue.
But it doesn't scale. It'll process 50 items at a time, once every 2 minutes. That's severely limited.
Instead, I intend to model each user's combination of connected networks (fb, G+, etc) as a "hub", which is protected by a Mutex (Semaphore with Count=1). I can have separate tasks (Workers) monitoring each network, and driving changes to the others, which don't communicate except that they share the hub Mutex, so only one can run at once. So, if there are changes in a user's G+ and Facebook at the same time, processing those changes will be concurrency safe, with the critical shared pieces protected.
Why not just use transactions?
Because transactions only protect the datastore, and enqueuing of tasks.
Resources under contention can be all kinds of things. In the case of Syyncc, they involve reading and writing the datastore *and* reading and writing social networks. A model of managing contention is better here than transactions (simply because the latter aren't available!)
And, I want to make a State Machine
A project I've got coming up involves starting and stopping virtual machines in AWS. To do that well turns out to be involved, and really I want a full State Machine, where I can define a state graph, with nodes, conditions, transitions, ooh. And to build that, I'll need Semaphores.
Expect to see an article on implementation of a Semaphore-based State Machine in the near future.