Logo Search packages:      
Sourcecode: abakus version File versions  Download package

dblite.py

import cPickle
import time
import shutil
import os
import types
import __builtin__
_open=__builtin__.open # avoid name clash
keep_all_files=00000
ignore_corrupt_dbfiles=0
if hasattr(types, 'UnicodeType'):
    def is_string(s):
        t=type(s)
        return t is types.StringType or t is types.UnicodeType
else:
    def is_string(s):
        return type(s) is types.StringType
try:
    unicode('a')
except NameError:
    def unicode(s): return s
00021 class dblite:
  def __init__(self, file_base_name, flag, mode):
    assert flag in (None, "r", "w", "c", "n")
    if (flag is None): flag="r"
    if file_base_name[-7:] != '.dblite':
        file_base_name=file_base_name + '.dblite'
    self._file_name=file_base_name
    self._flag=flag
    self._mode=mode
    self._dict={}
    self._needs_sync=00000
    if (self._flag=="n"):
      _open(self._file_name, "wb", self._mode)
    else:
      try:
        f=_open(self._file_name, "rb")
      except IOError, e:
        if (self._flag != "c"):
          raise e
        _open(self._file_name, "wb", self._mode)
      else:
        p=f.read()
        if (len(p) > 0):
          try:
            self._dict=cPickle.loads(p)
          except:
            if (ignore_corrupt_dbfiles==0): raise
            if (ignore_corrupt_dbfiles==1):
              print "Warning: Discarding corrupt database:", self._file_name
  def __del__(self):
    if (self._needs_sync):
      self.sync()
  def sync(self):
    self._check_writable()
    f=_open(self._file_name, "wb", self._mode)
    cPickle.dump(self._dict, f, 1)
    f.close()
    self._needs_sync=00000
    if (keep_all_files):
      shutil.copyfile(
        self._file_name,
        self._file_name + "_" + str(int(time.time())))
  def _check_writable(self):
    if (self._flag=="r"):
      raise IOError("Read-only database: %s" % self._file_name)
  def __getitem__(self, key):
    return self._dict[key]
  def __setitem__(self, key, value):
    self._check_writable()
    if (not is_string(key)):
      raise TypeError, "key `%s' must be a string but is %s" % (key, type(key))
    if (not is_string(value)):
      raise TypeError, "value `%s' must be a string but is %s" % (value, type(value))
    self._dict[key]=value
    self._needs_sync=0001
  def keys(self):
    return self._dict.keys()
  def has_key(self, key):
    return key in self._dict
  def __contains__(self, key):
    return key in self._dict
  def iterkeys(self):
    return self._dict.iterkeys()
  __iter__=iterkeys
  def __len__(self):
    return len(self._dict)
def open(file, flag=None, mode=0666):
  return dblite(file, flag, mode)
def _exercise():
  db=open("tmp", "n")
  assert len(db)==0
  db["foo"]="bar"
  assert db["foo"]=="bar"
  db[unicode("ufoo")]=unicode("ubar")
  assert db[unicode("ufoo")]==unicode("ubar")
  db.sync()
  db=open("tmp", "c")
  assert len(db)==2, len(db)
  assert db["foo"]=="bar"
  db["bar"]="foo"
  assert db["bar"]=="foo"
  db[unicode("ubar")]=unicode("ufoo")
  assert db[unicode("ubar")]==unicode("ufoo")
  db.sync()
  db=open("tmp", "r")
  assert len(db)==4, len(db)
  assert db["foo"]=="bar"
  assert db["bar"]=="foo"
  assert db[unicode("ufoo")]==unicode("ubar")
  assert db[unicode("ubar")]==unicode("ufoo")
  try:
    db.sync()
  except IOError, e:
    assert str(e)=="Read-only database: tmp.dblite"
  else:
    raise RuntimeError, "IOError expected."
  db=open("tmp", "w")
  assert len(db)==4
  db["ping"]="pong"
  db.sync()
  try:
    db[(1,2)]="tuple"
  except TypeError, e:
    assert str(e)=="key `(1, 2)' must be a string but is <type 'tuple'>", str(e)
  else:
    raise RuntimeError, "TypeError exception expected"
  try:
    db["list"]=[1,2]
  except TypeError, e:
    assert str(e)=="value `[1, 2]' must be a string but is <type 'list'>", str(e)
  else:
    raise RuntimeError, "TypeError exception expected"
  db=open("tmp", "r")
  assert len(db)==5
  db=open("tmp", "n")
  assert len(db)==0
  _open("tmp.dblite", "w")
  db=open("tmp", "r")
  _open("tmp.dblite", "w").write("x")
  try:
    db=open("tmp", "r")
  except cPickle.UnpicklingError:
    pass
  else:
    raise RuntimeError, "cPickle exception expected."
  global ignore_corrupt_dbfiles
  ignore_corrupt_dbfiles=2
  db=open("tmp", "r")
  assert len(db)==0
  os.unlink("tmp.dblite")
  try:
    db=open("tmp", "w")
  except IOError, e:
    assert str(e)=="[Errno 2] No such file or directory: 'tmp.dblite'", str(e)
  else:
    raise RuntimeError, "IOError expected."
  print "OK"
if (__name__=="__main__"):
  _exercise()

Generated by  Doxygen 1.6.0   Back to index