http://en.wikipedia.org/wiki/Dining_philosophers_problem
Five silent philosophers sit at a table around a bowl of spaghetti. A fork is placed between each pair of adjacent philosophers.
Each philosopher must alternately think and eat. Eating is not limited by the amount of spaghetti left: assume an infinite supply. However, a philosopher can only eat while holding both the fork to the left and the fork to the right (an alternative problem formulation uses rice and chopsticks instead of spaghetti and forks).
Each philosopher can pick up an adjacent fork, when available, and put it down, when holding it. These are separate actions: forks must be picked up and put down one by one.
The problem is how to design a discipline of behavior (a concurrent algorithm) such that each philosopher won't starve, i.e. can forever continue to alternate between eating and thinking.
Ok, so I said in the previous post that AppEngine is a massive distributed machine. So on such a machine, we should be able to implement a solution to The Dining Philosophers Problem. How would we go about it?
Implementation of Semaphores
Firstly, we need a working Semaphore implementation. In the previous post I sketched out an approach, but since then I've built a proper working version.
Find the full Semaphore implementation here: Semaphore.py on github
class Semaphore(polymodel.PolyModel): _counter = db.IntegerProperty() _suspendList = db.StringListProperty() def ConstructSemaphore(cls, aCounter): retval = cls() retval._counter = aCounter retval._suspendList = [] return retval ConstructSemaphore = classmethod(ConstructSemaphore) def Wait(self, obj, *args, **kwargs): while True: try: lneedsRun = db.run_in_transaction( _doWait, self.key(), obj, *args, **kwargs ) if lneedsRun: try: obj(*args, **kwargs) except Exception, ex: logging.error(ex) break except TransactionFailedError: # do nothing logging.warning("TransactionFailedError in Wait, try again") def Signal(self): while True: try: db.run_in_transaction(_doSignal, self.key()) break except TransactionFailedError: # do nothing logging.warning("TransactionFailedError in Signal, try again") def _doWait(aKey, aObj, *args, **kwargs): lneedsRun = False lsem = db.get(aKey) if not lsem: raise Exception("Internal: failed to retrieve semaphore in _doWait") if lsem._counter > 0: lsem._counter -= 1 logging.debug("counter: %s" % lsem._counter) lneedsRun = True else: logging.debug("about to defer") pickled = deferred.serialize(aObj, *args, **kwargs) pickled = base64.encodestring(pickled) logging.debug("after defer, pickled=%s" % pickled) lsem._suspendList.append(pickled) lsem.put() return lneedsRun def _doSignal(aKey): lsem = db.get(aKey) if not lsem: raise Exception("Internal: failed to retrieve semaphore in _doSignal") if len(lsem._suspendList) > 0: logging.debug("about to unsuspend") pickled = lsem._suspendList.pop() pickled = base64.decodestring(pickled) logging.debug("pickled=%s" % pickled) # here I depickle the pickled call information, only to repickle inside # deferred.defer. Not ideal, but cleaner given the calls available # inside deferred. try: obj, args, kwds = pickle.loads(pickled) except Exception, e: raise deferred.PermanentTaskFailure(e) logging.debug("about to defer") deferred.defer(obj, _transactional=True, *args, **kwds) #deferred.run(pickled) logging.debug("after defer") else: lsem._counter += 1 lsem.put()
What I wrote last time was basically correct. I've using Nick Johnson's deferred.defer library to make the adding and running of tasks smoother (ie: hide the use of web handlers).
Some points of interest:
The _suspendList of Semaphore is a stringlist, which stores the same pickled call information that deferred.defer uses. I've made liberal use of internal functions of deferred to make this work, basically because that library is written by someone far more competent with Python than me, so why not?
The _suspendList of Semaphore is a stringlist, which stores the same pickled call information that deferred.defer uses. I've made liberal use of internal functions of deferred to make this work, basically because that library is written by someone far more competent with Python than me, so why not?
Wait() takes obj, *args, **kwargs and passes them to the call of _doWait() in a transaction. obj, *args and **kwargs define the call to make once we have successfully acquired the semaphore. So, inside _doWait(), we'll either need to add obj (et al) to the _suspendList, or, if we can get the semaphore immediately, we need to call obj(). However, we can't call it inside _doWait() because it's inside a transaction - we don't want or need to be inside the context of that transaction for the call to obj(). So instead, _doWait returns a bool, which is True in the case that we've aquired the semaphore, and you'll see that Wait checks this result, and calls obj() immediately if it's True. Thus, obj() is called if necessary, but outside the transaction.
The pickled call information, created by deferred.serialize(), isn't safe to save in a stringlist (illegal characters, throws errors). So, I base64 encode it before saving it in _doWait(). You'll see in _doSignal that the pickled call information is base64 decoded.
You'll notice that inside _doSignal(), when there are waiting tasks on the _suspendList, that I dequeue one (the last one, but order is irrelevant in a Semaphore implementation), unpickle it, but I don't call it. Instead, I add a task for it using deferred.defer(). I don't call it because the task we are in has just finished doing work while holding the semaphore, which might have been lengthy. These are short lived tasks, we shouldn't do more than one user-defined thing in one task. So, instead of running this second thing (the dequeued suspended task), I reschedule it to run immediately in another task. Note also that I mark that defer as transactional; it means that if the signal transaction fails, the task wont be enqueued, which is what we want.
One last note: In case it's not obvious, the transactions combined with a reload of the Semaphore ensure that we can safely use Semaphore objects even if they are stale. So don't worry about stale Semaphores causing contention issues. This is explained more in the previous post.
Oh, and I think if you pass a queue name into Wait() in the same way you would to a regular call to deferred.defer() (ie: a parameter _queue="somequeuename"), the semaphore will use that queue instead of default, which might be handy.
Testing the Semaphores
I've got some simple test code in Semaphore.py for giving Semaphores a run.
def SemaphoreTest1():
The pickled call information, created by deferred.serialize(), isn't safe to save in a stringlist (illegal characters, throws errors). So, I base64 encode it before saving it in _doWait(). You'll see in _doSignal that the pickled call information is base64 decoded.
You'll notice that inside _doSignal(), when there are waiting tasks on the _suspendList, that I dequeue one (the last one, but order is irrelevant in a Semaphore implementation), unpickle it, but I don't call it. Instead, I add a task for it using deferred.defer(). I don't call it because the task we are in has just finished doing work while holding the semaphore, which might have been lengthy. These are short lived tasks, we shouldn't do more than one user-defined thing in one task. So, instead of running this second thing (the dequeued suspended task), I reschedule it to run immediately in another task. Note also that I mark that defer as transactional; it means that if the signal transaction fails, the task wont be enqueued, which is what we want.
One last note: In case it's not obvious, the transactions combined with a reload of the Semaphore ensure that we can safely use Semaphore objects even if they are stale. So don't worry about stale Semaphores causing contention issues. This is explained more in the previous post.
Oh, and I think if you pass a queue name into Wait() in the same way you would to a regular call to deferred.defer() (ie: a parameter _queue="somequeuename"), the semaphore will use that queue instead of default, which might be handy.
Testing the Semaphores
I've got some simple test code in Semaphore.py for giving Semaphores a run.
def SemaphoreTest1():
logging.info("*****************************") logging.info("** BEGIN SEMAPHORETEST1 **") logging.info("*****************************") lsem = Semaphore.ConstructSemaphore(2) lsem.put() lcount = 0 while lcount < 20: deferred.defer(SemaphoreTest1EntryPoint, lsem.key(), lcount, True) lcount += 1 def SemaphoreTest1EntryPoint(aKey, aNum, aFirst): lsem = db.get(aKey) if not lsem: raise Exception("Failed to retrieve semaphore in EntryPoint1") if aFirst: # this is before we've got the semaphore logging.info("Before Wait for %s" % aNum) lsem.Wait(SemaphoreTest1EntryPoint, aKey, aNum, False) else: # we now have the semaphore logging.info("Begin Critsec for %s" % aNum) sleep(2) # stay inside critsec for 2 seconds logging.info("End Critsec for %s" % aNum) lsem.Signal() logging.info ("After Signal for %s" % aNum)
SemaphoreTest1() creates a new Semaphore with counter=2 (ie: allow max 2 tasks to be inside the Semaphore at any time), and schedules 20 tasks to run immediately, which all run SemaphoreTestEntryPoint() with the new Semaphore (passed by key in the aKey parameter).
SemaphoreTestEntryPoint() loads the Semaphore, then takes one of two paths. The first time through aFirst is True (we don't hold the semaphore yet); it waits on the semaphore (switching aFirst to False). The second time through, with aFirst as False, where we hold the Semaphore, it sleeps for a couple of seconds, then signals the Semaphore and exits.
The upshot of this is that the 20 tasks, all run at the same time, will contend for the Semaphore. The first two to get it will sit inside it for 2 seconds (a long time, while the other tasks keep waiting on it and suspending). Eventually these tasks holding it will signal it and exit. Each time a task signals, it'll kick off a waiting one, which in turn will get it, again taking two seconds, then exit. And do on until there are no more tasks left.
Try running this code, and fiddling with the parameters a bit. Note that you'll find the whole project on GitHub, here.
Back to Dinner
So we've got a working Semaphore. So how do we implement a solution to the dining philosophers?
Let's implement the classic deadlock algorithm first. It goes like this:
- think until the left fork is available; when it is, pick it up;
- think until the right fork is available; when it is, pick it up
- eat
- put the left fork down
- put the right fork down
- repeat from the start
class Fork(Semaphore.Semaphore): def ConstructFork(cls): lfork = cls.ConstructSemaphore(1) return lfork ConstructFork = classmethod(ConstructFork)
I used a polymodel for Semaphore, so we can override it as above.
Next is an implementation of the faulty algorithm above. ThinkAndEatByKey is a wrapper of ThinkAndEat, which takes Forks by key rather than by object reference, loads them, and calls through. The real work happens in ThinkAndEat.
def ThinkAndEatByKey(aFirstForkKey, aSecondForkKey, aIndex, aNumLoops, aHasFirst, aHasSecond): lFirstFork = db.get(aFirstForkKey) if not lFirstFork: raise Exception("Failed to retrieve Left Fork") lSecondFork = db.get(aSecondForkKey) if not lSecondFork: raise Exception("Failed to retrieve Right Fork") ThinkAndEat(lFirstFork, lSecondFork, aIndex, aNumLoops, aHasFirst, aHasSecond) def ThinkAndEat(aFirstFork, aSecondFork, aIndex, aNumLoops, aHasFirst=False, aHasSecond=False): if not aHasFirst: # this is before we've got the semaphore logging.info("Wait on first for %s" % aIndex) aFirstFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, False) elif not aHasSecond: sleep(10) # takes a while to pick up the second fork! logging.info("Wait on second for %s" % aIndex) aSecondFork.Wait(ThinkAndEatByKey, aFirstFork.key(), aSecondFork.key(), aIndex, aNumLoops, True, True) else: logging.info("EAT for %s" % aIndex) logging.info("Dropping second fork for %s" % aIndex) aSecondFork.Signal() logging.info("Dropping first fork for %s" % aIndex) aFirstFork.Signal() if aNumLoops == 1: logging.info("Finished looping, done.") else: logging.info("Ready to think again, deferring") deferred.defer( ThinkAndEat, aFirstFork, aSecondFork, aIndex, aNumLoops-1 )
ThinkAndEat has three states. First, if we have neither Semaphore then aHasFirst and aHasSecond are false (I use First and Second instead of Left and Right for a bit of leeway later on). In this case, we wait on the first fork, and aHasFirst, aHasSecond will be True/False on the next call. This is the next case, where we then wait on the the second fork, and aHasFirst, aHasSecond will both be True on the third call. Finally, when they are both true, we have both forks. So we Eat (just log something, but this could be a lengthy op of some kind), then Signal, ie: drop, both forks.
Finally, we reschedule ThinkAndEat again to complete the loop.
You'll note in the second state I've added a sleep for 10 seconds. That is, between picking up the first fork and picking up the second, our philosophers think for a really long time. This doesn't change the theoretical behaviour of the algorithm, but in practice it makes it very easy to deadlock especially on the first iteration; everyone will pick up the first fork, there'll be a pause, then everyone will try to pick up the second fork and have to wait indefinitely.
Note that I use a countdown, aNumLoops, to stop this running forever. Eventually, in my house, even the Philosophers need to finish and go home!
Now, to finish implementing the algorithm above, we need to create all the forks, then call ThinkAndEat for each philosopher, passing in the correct forks.
def DiningPhilosphersFailTest(): lnumPhilosophers = 5 lnumLoops = 5 # number of think/eat loops
leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20) lforks = [] lforkIndex = 0 while lforkIndex < lnumPhilosophers: lfork = Fork.ConstructFork() lfork.put() lforks.append(lfork) lforkIndex += 1 lphilosopherIndex = 0 while lphilosopherIndex < lnumPhilosophers: deferred.defer( ThinkAndEat, lforks[lphilosopherIndex], lforks[(lphilosopherIndex+1) % lnumPhilosophers], lphilosopherIndex, lnumLoops, _eta = leta ) lphilosopherIndex += 1
This method sets up 5 philosophers with 5 forks, who will perform the ThinkAndEat loop 5 times. Philosopher i (i from 0 to 4) gets left fork i, right fork i+1, except for the philosopher 4, who gets left fork 4, right fork 0 (ie: a round table).
When you run this method, it tends to deadlock every time. You can watch the default task list until the queued tasks goes to zero, then go to the datastore and run this query:
select * from Fork where _counter = 0
You should get a bunch of results; each one is a Fork (ie: Semaphore) which has had a successful Wait(), but no comparable Signal(). For this to be the case when no tasks are running requires that a set of philosophers (all of them in fact) have one fork and are waiting for another. Deadlock.
Now to fix this, Dijkstra (who originally posed this problem) proposed a strict ordering of resources (forks) along with a rule that philosophers acquire forks only in resource order. So, if we use the fork numbering above, and say we must acquire lower numbered forks before higher numbered, then we have the same algorithm *except* that philosopher 4 must now change to acquire his right fork first (fork 0) followed by his left fork (fork 4).
We can achieve this simply by swapping forks, passing fork 0 as FirstFork and fork 4 as SecondFork. This is of course why I used First and Second rather than Left and Right.
So here's the non-deadlocking solution:
def DiningPhilosphersSucceedTest(): lnumPhilosophers = 5 lnumLoops = 5 # number of think/eat loops leta = datetime.datetime.utcnow() + datetime.timedelta(seconds=20) lforks = [] lforkIndex = 0 while lforkIndex < lnumPhilosophers: lfork = Fork.ConstructFork() lfork.put() lforks.append(lfork) lforkIndex += 1 lphilosopherIndex = 0 while lphilosopherIndex < lnumPhilosophers: if lphilosopherIndex < lnumPhilosophers-1: # not the last one deferred.defer( ThinkAndEat, lforks[lphilosopherIndex], lforks[lphilosopherIndex+1], lphilosopherIndex, lnumLoops, _eta = leta ) else: # the last one deferred.defer( ThinkAndEat, lforks[0], lforks[lphilosopherIndex], lphilosopherIndex, lnumLoops, _eta = leta ); lphilosopherIndex += 1
If you run this you wont get deadlock. You should find that when all tasks complete (the task queue is empty), "select * from Fork where _counter = 0" will give you zero objects.
One more note: You can't see this deadlock with the development appserver. Why not? Because the development server runs tasks sequentially. The Semaphores and Tasks will work, but in a very uninteresting one-task-at-a-time way.
Lifetime issues with Semaphores
You'll notice that all these tests leave Semaphore objects lying around afterwards.
Classically, semaphores are in-memory structures, and require memory management techniques for lifetime management. We have an analogous cleanup requirement for these Datastore backed Semaphores.
You'll need some way to know when you are finished with them, so you can then delete them. For some uses it may be clear (eg: if they are always associated with another resource, then you create them when that resource is created and delete them when that resource is deleted). Other times, you may know that they should only live a certain length of time, so for example you could add a time-to-live or a delete-me-after field to them, and have a background process cleaning up every so often.
Very interesting, but why do I care?
AppEngine lends itself to all kinds of uses where we are managing access to resources. Either we are managing something limited (with a semaphore with a counter equal to the number of resources available), or we are managing contention over something which can only be touched by one task (with a semaphore with counter=1, otherwise known as a Mutex).
In my app Syyncc, I used a dumb cron loop to move changes from one social network to another. Because changes move in multiple directions, and the central mechanism isn't concurrency safe, I needed to use a single processing loop to ensure I was only working on one thing at a time. It was a brain dead approach to controlling a concurrency issue.
But it doesn't scale. It'll process 50 items at a time, once every 2 minutes. That's severely limited.
Instead, I intend to model each user's combination of connected networks (fb, G+, etc) as a "hub", which is protected by a Mutex (Semaphore with Count=1). I can have separate tasks (Workers) monitoring each network, and driving changes to the others, which don't communicate except that they share the hub Mutex, so only one can run at once. So, if there are changes in a user's G+ and Facebook at the same time, processing those changes will be concurrency safe, with the critical shared pieces protected.
Why not just use transactions?
Because transactions only protect the datastore, and enqueuing of tasks.
Resources under contention can be all kinds of things. In the case of Syyncc, they involve reading and writing the datastore *and* reading and writing social networks. A model of managing contention is better here than transactions (simply because the latter aren't available!)
And, I want to make a State Machine
A project I've got coming up involves starting and stopping virtual machines in AWS. To do that well turns out to be involved, and really I want a full State Machine, where I can define a state graph, with nodes, conditions, transitions, ooh. And to build that, I'll need Semaphores.
Expect to see an article on implementation of a Semaphore-based State Machine in the near future.
Clever, but I think the applications for this are extremely limited. In a distributed system like App Engine, aside from datastore access, there are very few shared resources for which this sort of synchronization is necessary - and introducing it limits your concurrency severely. I'm not sure what you're using it for in this case, but it seems likely it could be made to either not care about concurrency, or use some of the built in primitives like datastore transactions and task names.
ReplyDeleteBy the way, using a db.ListProperty(db.Blob) will allow you to store byte arrays without base64 encoding them. You didn't really want your pickled functions indexed anyway, did you?
> Clever, but I think the applications for this are extremely limited. In a
ReplyDelete> distributed system like App Engine, aside from datastore access, there
> are very few shared resources for which this sort of synchronization is
> necessary - and introducing it limits your concurrency severely. I'm not
> sure what you're using it for in this case, but it seems likely it could
> be made to either not care about concurrency, or use some of the built in
> primitives like datastore transactions and task names.
I agree the applications are limited, but I'm finding that I do nevertheless have applications. The one standing out for me at the moment is a State Machine. I'm going to build a class (datastore entity) which runs a state machine, where you can define nodes, transitions, and conditions, and have the statemachine take care of managing a current state, a queue of conditions which have occured, and the processing of each condition from that queue to move from state to state. Then descendants can provide a list of methods for what to do on entering a new state.
With Semaphores (because obviously this article is about the Semaphores, not really the Dining Philosophers), I can have conditions being raised by tasks, web hooks, input from users, whatever. Any of these can load the state machine, signal the condition (ie: push it onto a queue) and the internal state transition processing of the state machine can run inline, in separate tasks, whatever. It can be kept safe by using semaphores to protect critical internal variables (including current state, condition queue, probably other things)
The first application for State Machine that I have in mind is to control machines running in AWS. One statemachine per AWS machine, with conditions like "Start, Stop, Terminate" as well as any corresponding to any webhooks AWS supports to tell me about things happening at their end (need to read up more to see what's possible), plus some for webhooks implemented by my own code inside those machines, calling back to tell me important things. States would be something like "stopped, started, terminated" ... ?
If this doesn't make total sense, don't worry, I'll probably write more than one tl;dr article on this subject :-)
> By the way, using a db.ListProperty(db.Blob) will allow you to store
> byte arrays without base64 encoding them. You didn't really want your
> pickled functions indexed anyway, did you?
You're absolutely write, and that makes total sense. In a less lazy moment I'll fix it and post an update. Thanks!
Hey, and I learned a lot reading Deferred, thanks.
For the sort of state machine implementation you're talking about, you'd be much better off running it on a backend, or in tasks with task names to prevent concurrent execution. There's really no need to have multiple processes waiting on each other as you do with a semaphore. For AWS, though, it seems to me that all you need is a state value in your model, and to do your updates in a transaction, possibly in a task to ensure retry semantics.
ReplyDeleteActually, after reading your code in more detail, I see that no actual waiting is going on - you're using the semaphore as a way to schedule chunks of work that need to be done on a shared resource serially. This isn't a bad approach, but I think it's misleading to call it a semaphore.
ReplyDelete"but I think it's misleading to call it a semaphore."
ReplyDeleteI agree that, out of context, it's a bit misleading.
But the context (developed in "Not your mama's webserver) is that AppEngine as a distributed machine with a shared datastore can be thought of roughly analogously to a single parallel machine with shared memory (or any single machine with a multitasking OS). So just as you can use concurrency primitives (like Semaphores) to protect critical sections in a normal multi processing environment, you can use them to protect critical sections between tasks in AppEngine.
The implementation of semaphore that I've presented is fairly directly taken from the definition in Ben Ari's "Principles of Concurrent and Distributed Programming", ie: it's the classical definition of a Semaphore, modified to use AppEngine tasks and the datastore instead of processes and shared memory. So in that way it definitely *is* a Semaphore. You've just got to squint a bit.
I've had a few comments elsewhere of the form "why don't you use memcache for this". I can't
think of how I could, do you have any ideas?
"For AWS, though, it seems to me that all you need is a state value in your model, and to do your updates in a transaction, possibly in a task to ensure retry semantics."
This is true, there is definitely a specific solution to that job (AWS control) which could be done that way. And if I think about the implementation of the Semaphore, it occurs to me that the state machine will be exactly this, but nicely generalised.
What I want to develop here is an acceptable general solution to the problem of stateful behaviour with non-idempotent operations. I keep coming up with uses for it; eg: complex email notification (where lots of things can fail while trying to compile information for a notification, plus you really need to send emails once and only once), updating external services (coordinated complex datastore operations with gets and posts on external services), and of course AWS management.
Once I've got this statemachine, it'll mean it's really simple to put together complex engines for coordinating actions on the world outside AppEngine. I hoping it'll be a massively practical and useful library!
The reason I'm saying it's not a semaphore is not because of the nature of the system, but because it's not a locking primitive - it's a scheduling primitive. A standard semaphore makes a process wait until the resource is available, but yours schedules a chunk of code for later execution, most likely on a different machine. It can be used to achieve similar goals as a semaphore, but it's not the same thing.
ReplyDeleteRegarding using memcache, you're right that it's not a good choice. You can implement a sort-of-lock in memcache using its atomic operations, but it's unreliable by nature.
"The reason I'm saying it's not a semaphore is not because of the nature of the system, but because it's not a locking primitive - it's a scheduling primitive. A standard semaphore makes a process wait until the resource is available, but yours schedules a chunk of code for later execution, most likely on a different machine. It can be used to achieve similar goals as a semaphore, but it's not the same thing."
ReplyDeleteI see what you're saying here. The way I'm thinking about it is that I've got an model in my head of a level above tasks, which is roughly a Process (not to be confused with literal processes running on the actual AppEngine instances). A Process is a set of sequentially running tasks which, taken together, perform a process level operation (eg: the implementation of The Dining Philosophers.
These Semaphores do indeed schedule code for later execution, but if you think of that code as a continuation of the previous bit of code that Waited, then together they form a process that has indeed been blocked on a resource.
There are definitely problems with that model. Instead of being able to block inline at the Wait() statement, I've got to carve up the code into separate chunks. State isn't maintained between them in the same way that it would be if you were using a normal semaphore (because I don't have an OS layer to suspend the process). It's imperfect.
re: Locking vs Scheduling primitive, there's not really much difference I think. After all, in a traditional machine, what's the piece of the OS that deals with suspending and waking processes? The scheduler. What's really happening? The semaphores are coordinating process scheduling so as to protect some resource. Which is exactly what's going on here.
However, I agree it's confusing. These constructs could do with a different name to differentiate them from regular semaphores. Got any ideas?
I'm also playing with the idea of implementing a language on top of one of the existing ones in AppEngine (maybe on top of Go?), which hides tasks and instead presents Processes implemented in terms of tasks exactly as I'm using them here. That language could present a true Semaphore; you needn't see the switching from task to task as you have to see here.
There are definitely major differences between locking and scheduling. How would you hold two semaphores at the same time in your system?
ReplyDelete"How would you hold two semaphores at the same time in your system?"
ReplyDeleteJust construct two Semaphores and use them both. You could aggregate them into another object.
In fact, ThinkAndEat() above does exactly that.