GIF89a; EcchiShell v1.0
//usr/lib64/lib64/lib64/lib64/python2.7/= 1' % count self.count = count self.maxcount = count self.nonzero = condition() def p(self): self.nonzero.acquire() while self.count == 0: self.nonzero.wait() self.count = self.count - 1 self.nonzero.release() def v(self): self.nonzero.acquire() if self.count == self.maxcount: raise ValueError, '.v() tried to raise semaphore count above ' \ 'initial value %r' % self.maxcount self.count = self.count + 1 self.nonzero.signal() self.nonzero.release() class mrsw: def __init__(self): # critical-section lock & the data it protects self.rwOK = thread.allocate_lock() self.nr = 0 # number readers actively reading (not just waiting) self.nw = 0 # number writers either waiting to write or writing self.writing = 0 # 1 iff some thread is writing # conditions self.readOK = condition(self.rwOK) # OK to unblock readers self.writeOK = condition(self.rwOK) # OK to unblock writers def read_in(self): self.rwOK.acquire() while self.nw: self.readOK.wait() self.nr = self.nr + 1 self.rwOK.release() def read_out(self): self.rwOK.acquire() if self.nr <= 0: raise ValueError, \ '.read_out() invoked without an active reader' self.nr = self.nr - 1 if self.nr == 0: self.writeOK.signal() self.rwOK.release() def write_in(self): self.rwOK.acquire() self.nw = self.nw + 1 while self.writing or self.nr: self.writeOK.wait() self.writing = 1 self.rwOK.release() def write_out(self): self.rwOK.acquire() if not self.writing: raise ValueError, \ '.write_out() invoked without an active writer' self.writing = 0 self.nw = self.nw - 1 if self.nw: self.writeOK.signal() else: self.readOK.broadcast() self.rwOK.release() def write_to_read(self): self.rwOK.acquire() if not self.writing: raise ValueError, \ '.write_to_read() invoked without an active writer' self.writing = 0 self.nw = self.nw - 1 self.nr = self.nr + 1 if not self.nw: self.readOK.broadcast() self.rwOK.release() # The rest of the file is a test case, that runs a number of parallelized # quicksorts in parallel. If it works, you'll get about 600 lines of # tracing output, with a line like # test passed! 209 threads created in all # as the last line. The content and order of preceding lines will # vary across runs. def _new_thread(func, *args): global TID tid.acquire(); id = TID = TID+1; tid.release() io.acquire(); alive.append(id); \ print 'starting thread', id, '--', len(alive), 'alive'; \ io.release() thread.start_new_thread( func, (id,) + args ) def _qsort(tid, a, l, r, finished): # sort a[l:r]; post finished when done io.acquire(); print 'thread', tid, 'qsort', l, r; io.release() if r-l > 1: pivot = a[l] j = l+1 # make a[l:j] <= pivot, and a[j:r] > pivot for i in range(j, r): if a[i] <= pivot: a[j], a[i] = a[i], a[j] j = j + 1 a[l], a[j-1] = a[j-1], pivot l_subarray_sorted = event() r_subarray_sorted = event() _new_thread(_qsort, a, l, j-1, l_subarray_sorted) _new_thread(_qsort, a, j, r, r_subarray_sorted) l_subarray_sorted.wait() r_subarray_sorted.wait() io.acquire(); print 'thread', tid, 'qsort done'; \ alive.remove(tid); io.release() finished.post() def _randarray(tid, a, finished): io.acquire(); print 'thread', tid, 'randomizing array'; \ io.release() for i in range(1, len(a)): wh.acquire(); j = randint(0,i); wh.release() a[i], a[j] = a[j], a[i] io.acquire(); print 'thread', tid, 'randomizing done'; \ alive.remove(tid); io.release() finished.post() def _check_sort(a): if a != range(len(a)): raise ValueError, ('a not sorted', a) def _run_one_sort(tid, a, bar, done): # randomize a, and quicksort it # for variety, all the threads running this enter a barrier # at the end, and post `done' after the barrier exits io.acquire(); print 'thread', tid, 'randomizing', a; \ io.release() finished = event() _new_thread(_randarray, a, finished) finished.wait() io.acquire(); print 'thread', tid, 'sorting', a; io.release() finished.clear() _new_thread(_qsort, a, 0, len(a), finished) finished.wait() _check_sort(a) io.acquire(); print 'thread', tid, 'entering barrier'; \ io.release() bar.enter() io.acquire(); print 'thread', tid, 'leaving barrier'; \ io.release() io.acquire(); alive.remove(tid); io.release() bar.enter() # make sure they've all removed themselves from alive ## before 'done' is posted bar.enter() # just to be cruel done.post() def test(): global TID, tid, io, wh, randint, alive import random randint = random.randint TID = 0 # thread ID (1, 2, ...) tid = thread.allocate_lock() # for changing TID io = thread.allocate_lock() # for printing, and 'alive' wh = thread.allocate_lock() # for calls to random alive = [] # IDs of active threads NSORTS = 5 arrays = [] for i in range(NSORTS): arrays.append( range( (i+1)*10 ) ) bar = barrier(NSORTS) finished = event() for i in range(NSORTS): _new_thread(_run_one_sort, arrays[i], bar, finished) finished.wait() print 'all threads done, and checking results ...' if alive: raise ValueError, ('threads still alive at end', alive) for i in range(NSORTS): a = arrays[i] if len(a) != (i+1)*10: raise ValueError, ('length of array', i, 'screwed up') _check_sort(a) print 'test passed!', TID, 'threads created in all' if __name__ == '__main__': test() # end of module