GIF89a; EcchiShell v1.0
//lib64/python2.7/

Mass Deface = 0: head, dir, tail = seq[:i], '', seq[i+1:] if tail[:1] in '-+': dir, tail = tail[:1], tail[1:] if not isnumeric(tail): raise Error, "bad message list %s" % seq try: count = int(tail) except (ValueError, OverflowError): # Can't use sys.maxint because of i+count below count = len(all) try: anchor = self._parseindex(head, all) except Error, msg: seqs = self.getsequences() if not head in seqs: if not msg: msg = "bad message list %s" % seq raise Error, msg, sys.exc_info()[2] msgs = seqs[head] if not msgs: raise Error, "sequence %s empty" % head if dir == '-': return msgs[-count:] else: return msgs[:count] else: if not dir: if head in ('prev', 'last'): dir = '-' if dir == '-': i = bisect(all, anchor) return all[max(0, i-count):i] else: i = bisect(all, anchor-1) return all[i:i+count] # Test for X-Y next i = seq.find('-') if i >= 0: begin = self._parseindex(seq[:i], all) end = self._parseindex(seq[i+1:], all) i = bisect(all, begin-1) j = bisect(all, end) r = all[i:j] if not r: raise Error, "bad message list %s" % seq return r # Neither X:Y nor X-Y; must be a number or a (pseudo-)sequence try: n = self._parseindex(seq, all) except Error, msg: seqs = self.getsequences() if not seq in seqs: if not msg: msg = "bad message list %s" % seq raise Error, msg return seqs[seq] else: if n not in all: if isnumeric(seq): raise Error, "message %d doesn't exist" % n else: raise Error, "no %s message" % seq else: return [n] def _parseindex(self, seq, all): """Internal: parse a message number (or cur, first, etc.).""" if isnumeric(seq): try: return int(seq) except (OverflowError, ValueError): return sys.maxint if seq in ('cur', '.'): return self.getcurrent() if seq == 'first': return all[0] if seq == 'last': return all[-1] if seq == 'next': n = self.getcurrent() i = bisect(all, n) try: return all[i] except IndexError: raise Error, "no next message" if seq == 'prev': n = self.getcurrent() i = bisect(all, n-1) if i == 0: raise Error, "no prev message" try: return all[i-1] except IndexError: raise Error, "no prev message" raise Error, None def openmessage(self, n): """Open a message -- returns a Message object.""" return Message(self, n) def removemessages(self, list): """Remove one or more messages -- may raise os.error.""" errors = [] deleted = [] for n in list: path = self.getmessagefilename(n) commapath = self.getmessagefilename(',' + str(n)) try: os.unlink(commapath) except os.error: pass try: os.rename(path, commapath) except os.error, msg: errors.append(msg) else: deleted.append(n) if deleted: self.removefromallsequences(deleted) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors) def refilemessages(self, list, tofolder, keepsequences=0): """Refile one or more messages -- may raise os.error. 'tofolder' is an open folder object.""" errors = [] refiled = {} for n in list: ton = tofolder.getlast() + 1 path = self.getmessagefilename(n) topath = tofolder.getmessagefilename(ton) try: os.rename(path, topath) except os.error: # Try copying try: shutil.copy2(path, topath) os.unlink(path) except (IOError, os.error), msg: errors.append(msg) try: os.unlink(topath) except os.error: pass continue tofolder.setlast(ton) refiled[n] = ton if refiled: if keepsequences: tofolder._copysequences(self, refiled.items()) self.removefromallsequences(refiled.keys()) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors) def _copysequences(self, fromfolder, refileditems): """Helper for refilemessages() to copy sequences.""" fromsequences = fromfolder.getsequences() tosequences = self.getsequences() changed = 0 for name, seq in fromsequences.items(): try: toseq = tosequences[name] new = 0 except KeyError: toseq = [] new = 1 for fromn, ton in refileditems: if fromn in seq: toseq.append(ton) changed = 1 if new and toseq: tosequences[name] = toseq if changed: self.putsequences(tosequences) def movemessage(self, n, tofolder, ton): """Move one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass try: os.rename(path, topath) except os.error: # Try copying ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass os.unlink(path) self.removefromallsequences([n]) def copymessage(self, n, tofolder, ton): """Copy one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass def createmessage(self, n, txt): """Create a message, with text from the open file txt.""" path = self.getmessagefilename(n) backuppath = self.getmessagefilename(',%d' % n) try: os.rename(path, backuppath) except os.error: pass ok = 0 BUFSIZE = 16*1024 try: f = open(path, "w") while 1: buf = txt.read(BUFSIZE) if not buf: break f.write(buf) f.close() ok = 1 finally: if not ok: try: os.unlink(path) except os.error: pass def removefromallsequences(self, list): """Remove one or more messages from all sequences (including last) -- but not from 'cur'!!!""" if hasattr(self, 'last') and self.last in list: del self.last sequences = self.getsequences() changed = 0 for name, seq in sequences.items(): if name == 'cur': continue for n in list: if n in seq: seq.remove(n) changed = 1 if not seq: del sequences[name] if changed: self.putsequences(sequences) def getlast(self): """Return the last message number.""" if not hasattr(self, 'last'): self.listmessages() # Set self.last return self.last def setlast(self, last): """Set the last message number.""" if last is None: if hasattr(self, 'last'): del self.last else: self.last = last class Message(mimetools.Message): def __init__(self, f, n, fp = None): """Constructor.""" self.folder = f self.number = n if fp is None: path = f.getmessagefilename(n) fp = open(path, 'r') mimetools.Message.__init__(self, fp) def __repr__(self): """String representation.""" return 'Message(%s, %s)' % (repr(self.folder), self.number) def getheadertext(self, pred = None): """Return the message's header text as a string. If an argument is specified, it is used as a filter predicate to decide which headers to return (its argument is the header name converted to lower case).""" if pred is None: return ''.join(self.headers) headers = [] hit = 0 for line in self.headers: if not line[0].isspace(): i = line.find(':') if i > 0: hit = pred(line[:i].lower()) if hit: headers.append(line) return ''.join(headers) def getbodytext(self, decode = 1): """Return the message's body text as string. This undoes a Content-Transfer-Encoding, but does not interpret other MIME features (e.g. multipart messages). To suppress decoding, pass 0 as an argument.""" self.fp.seek(self.startofbody) encoding = self.getencoding() if not decode or encoding in ('', '7bit', '8bit', 'binary'): return self.fp.read() try: from cStringIO import StringIO except ImportError: from StringIO import StringIO output = StringIO() mimetools.decode(self.fp, output, encoding) return output.getvalue() def getbodyparts(self): """Only for multipart messages: return the message's body as a list of SubMessage objects. Each submessage object behaves (almost) as a Message object.""" if self.getmaintype() != 'multipart': raise Error, 'Content-Type is not multipart/*' bdry = self.getparam('boundary') if not bdry: raise Error, 'multipart/* without boundary param' self.fp.seek(self.startofbody) mf = multifile.MultiFile(self.fp) mf.push(bdry) parts = [] while mf.next(): n = "%s.%r" % (self.number, 1 + len(parts)) part = SubMessage(self.folder, n, mf) parts.append(part) mf.pop() return parts def getbody(self): """Return body, either a string or a list of messages.""" if self.getmaintype() == 'multipart': return self.getbodyparts() else: return self.getbodytext() class SubMessage(Message): def __init__(self, f, n, fp): """Constructor.""" Message.__init__(self, f, n, fp) if self.getmaintype() == 'multipart': self.body = Message.getbodyparts(self) else: self.body = Message.getbodytext(self) self.bodyencoded = Message.getbodytext(self, decode=0) # XXX If this is big, should remember file pointers def __repr__(self): """String representation.""" f, n, fp = self.folder, self.number, self.fp return 'SubMessage(%s, %s, %s)' % (f, n, fp) def getbodytext(self, decode = 1): if not decode: return self.bodyencoded if type(self.body) == type(''): return self.body def getbodyparts(self): if type(self.body) == type([]): return self.body def getbody(self): return self.body class IntSet: """Class implementing sets of integers. This is an efficient representation for sets consisting of several continuous ranges, e.g. 1-100,200-400,402-1000 is represented internally as a list of three pairs: [(1,100), (200,400), (402,1000)]. The internal representation is always kept normalized. The constructor has up to three arguments: - the string used to initialize the set (default ''), - the separator between ranges (default ',') - the separator between begin and end of a range (default '-') The separators must be strings (not regexprs) and should be different. The tostring() function yields a string that can be passed to another IntSet constructor; __repr__() is a valid IntSet constructor itself. """ # XXX The default begin/end separator means that negative numbers are # not supported very well. # # XXX There are currently no operations to remove set elements. def __init__(self, data = None, sep = ',', rng = '-'): self.pairs = [] self.sep = sep self.rng = rng if data: self.fromstring(data) def reset(self): self.pairs = [] def __cmp__(self, other): return cmp(self.pairs, other.pairs) def __hash__(self): return hash(self.pairs) def __repr__(self): return 'IntSet(%r, %r, %r)' % (self.tostring(), self.sep, self.rng) def normalize(self): self.pairs.sort() i = 1 while i < len(self.pairs): alo, ahi = self.pairs[i-1] blo, bhi = self.pairs[i] if ahi >= blo-1: self.pairs[i-1:i+1] = [(alo, max(ahi, bhi))] else: i = i+1 def tostring(self): s = '' for lo, hi in self.pairs: if lo == hi: t = repr(lo) else: t = repr(lo) + self.rng + repr(hi) if s: s = s + (self.sep + t) else: s = t return s def tolist(self): l = [] for lo, hi in self.pairs: m = range(lo, hi+1) l = l + m return l def fromlist(self, list): for i in list: self.append(i) def clone(self): new = IntSet() new.pairs = self.pairs[:] return new def min(self): return self.pairs[0][0] def max(self): return self.pairs[-1][-1] def contains(self, x): for lo, hi in self.pairs: if lo <= x <= hi: return True return False def append(self, x): for i in range(len(self.pairs)): lo, hi = self.pairs[i] if x < lo: # Need to insert before if x+1 == lo: self.pairs[i] = (x, hi) else: self.pairs.insert(i, (x, x)) if i > 0 and x-1 == self.pairs[i-1][1]: # Merge with previous self.pairs[i-1:i+1] = [ (self.pairs[i-1][0], self.pairs[i][1]) ] return if x <= hi: # Already in set return i = len(self.pairs) - 1 if i >= 0: lo, hi = self.pairs[i] if x-1 == hi: self.pairs[i] = lo, x return self.pairs.append((x, x)) def addpair(self, xlo, xhi): if xlo > xhi: return self.pairs.append((xlo, xhi)) self.normalize() def fromstring(self, data): new = [] for part in data.split(self.sep): list = [] for subp in part.split(self.rng): s = subp.strip() list.append(int(s)) if len(list) == 1: new.append((list[0], list[0])) elif len(list) == 2 and list[0] <= list[1]: new.append((list[0], list[1])) else: raise ValueError, 'bad data passed to IntSet' self.pairs = self.pairs + new self.normalize() # Subroutines to read/write entries in .mh_profile and .mh_sequences def pickline(file, key, casefold = 1): try: f = open(file, 'r') except IOError: return None pat = re.escape(key) + ':' prog = re.compile(pat, casefold and re.IGNORECASE) while 1: line = f.readline() if not line: break if prog.match(line): text = line[len(key)+1:] while 1: line = f.readline() if not line or not line[0].isspace(): break text = text + line return text.strip() return None def updateline(file, key, value, casefold = 1): try: f = open(file, 'r') lines = f.readlines() f.close() except IOError: lines = [] pat = re.escape(key) + ':(.*)\n' prog = re.compile(pat, casefold and re.IGNORECASE) if value is None: newline = None else: newline = '%s: %s\n' % (key, value) for i in range(len(lines)): line = lines[i] if prog.match(line): if newline is None: del lines[i] else: lines[i] = newline break else: if newline is not None: lines.append(newline) tempfile = file + "~" f = open(tempfile, 'w') for line in lines: f.write(line) f.close() os.rename(tempfile, file) # Test program def test(): global mh, f os.system('rm -rf $HOME/Mail/@test') mh = MH() def do(s): print s; print eval(s) do('mh.listfolders()') do('mh.listallfolders()') testfolders = ['@test', '@test/test1', '@test/test2', '@test/test1/test11', '@test/test1/test12', '@test/test1/test11/test111'] for t in testfolders: do('mh.makefolder(%r)' % (t,)) do('mh.listsubfolders(\'@test\')') do('mh.listallsubfolders(\'@test\')') f = mh.openfolder('@test') do('f.listsubfolders()') do('f.listallsubfolders()') do('f.getsequences()') seqs = f.getsequences() seqs['foo'] = IntSet('1-10 12-20', ' ').tolist() print seqs f.putsequences(seqs) do('f.getsequences()') for t in reversed(testfolders): do('mh.deletefolder(%r)' % (t,)) do('mh.getcontext()') context = mh.getcontext() f = mh.openfolder(context) do('f.getcurrent()') for seq in ('first', 'last', 'cur', '.', 'prev', 'next', 'first:3', 'last:3', 'cur:3', 'cur:-3', 'prev:3', 'next:3', '1:3', '1:-3', '100:3', '100:-3', '10000:3', '10000:-3', 'all'): try: do('f.parsesequence(%r)' % (seq,)) except Error, msg: print "Error:", msg stuff = os.popen("pick %r 2>/dev/null" % (seq,)).read() list = map(int, stuff.split()) print list, "<-- pick" do('f.listmessages()') if __name__ == '__main__': test()