Squashed commit of the following:

commit cb509e8f75e3dcdbc66327be4bfbf6661aa084b5
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 22:06:28 2013 +0100

    Cut down 'import' statements to only modules actually used.

commit 0ea255115e095e31af5a991e9cce2b5b15cb496d
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 22:00:06 2013 +0100

     * Add getCollectionManager() so that the whole process can share the same ThreadingCollectionManager object.

     * Got the RestApp actually working!

commit 00997bab600b13d4b430ed2c2839b1d2232f55ed
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 21:04:58 2013 +0100

    Got the sync_app working again (more or less)

commit 459c69566bb92d2c0195a384e067d98c059bdea7
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 19:47:40 2013 +0100

    Started implementing test for the RESTful callbacks that PrepECN is going to need.

commit 7ffbac793f9bf45ab9056c1de475422b8742e107
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 17:19:06 2013 +0100

    Started work on a WSGI app for RESTful access to Anki based on Bibliobird code here:

      https://raw.github.com/dsnopek/bbcom/master/AnkiServer/AnkiServer/deck.py

commit 8820411388ce0c2b7b14769c614c22c675d2dbdd
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 15:03:56 2013 +0100

     * Seperated the collection and threading code.

     * Implemented a new interface to interact with the collections, which will hopefully be more transparent and testable.
This commit is contained in:
David Snopek
2013-07-12 22:08:16 +01:00
parent 661662400f
commit e25cf25684
12 changed files with 597 additions and 202 deletions

View File

@@ -2,28 +2,49 @@
import anki
import anki.storage
from threading import Thread
from Queue import Queue
import os, errno
try:
import simplejson as json
except ImportError:
import json
__all__ = ['CollectionWrapper', 'CollectionManager']
import os, errno, time, logging
__all__ = ['CollectionThread']
# TODO: I feel like we shouldn't need this wrapper...
class CollectionWrapper(object):
"""A simple wrapper around a collection for the purpose of opening and closing on demand
as well as doing special initialization."""
"""A simple wrapper around an anki.storage.Collection object.
def __init__(self, path):
This allows us to manage and refer to the collection, whether it's open or not. It
also provides a special "continuation passing" interface for executing functions
on the collection, which makes it easy to switch to a threading mode.
See ThreadingCollectionWrapper for a version that maintains a seperate thread for
interacting with the collection.
"""
def __init__(self, path, setup_new_collection=None):
self.path = os.path.realpath(path)
self._col = None
self.setup_new_collection = setup_new_collection
self.__col = None
def execute(self, func, args=[], kw={}, waitForReturn=True):
""" Executes the given function with the underlying anki.storage.Collection
object as the first argument and any additional arguments specified by *args
and **kw.
If 'waitForReturn' is True, then it will block until the function has
executed and return its return value. If False, the function MAY be
executed some time later and None will be returned.
"""
# Open the collection and execute the function
self.open()
args = [self.__col] + args
ret = func(*args, **kw)
# Only return the value if they requested it, so the interface remains
# identical between this class and ThreadingCollectionWrapper
if waitForReturn:
return ret
def __create_collection(self):
"""Creates a new collection and runs any special setup."""
def _create_colection(self):
# mkdir -p the path, because it might not exist
dirname = os.path.dirname(self.path)
try:
@@ -34,175 +55,55 @@ class CollectionWrapper(object):
else:
raise
col = anki.Storage.Collection(self.path)
self.__col = ank.storage.Collection(self.path)
# Do any special setup
self.setup_new_collection(col)
return col
def setup_new_collection(self, col):
"""Override this function to initial collections in some special way."""
pass
if self.setup_new_collection is not None:
self.setup_new_collection(self.__col)
def open(self):
if self._col is None:
"""Open the collection, or create it if it doesn't exist."""
if self.__col is None:
if os.path.exists(self.path):
self._col = anki.storage.Collection(self.path)
self.__col = anki.storage.Collection(self.path)
else:
self._col = self._create_collection()
return self._col
self.__col = self.__create_collection()
def close(self):
if self._col is None:
"""Close the collection if opened."""
if not self.opened():
return
self._col.close()
self._col = None
self.__col.close()
self.__col = None
def opened(self):
return self._col is not None
"""Returns True if the collection is open, False otherwise."""
return self.__col is not None
class CollectionThread(object):
def __init__(self, path, wrapper_class=CollectionWrapper):
self.path = os.path.realpath(path)
self.wrapper = wrapper_class(path)
class CollectionManager(object):
"""Manages a set of CollectionWrapper objects."""
self._queue = Queue()
self._thread = None
self._running = False
self.last_timestamp = time.time()
collection_wrapper = CollectionWrapper
@property
def running(self):
return self._running
def __init__(self):
self.collections = {}
def qempty(self):
return self._queue.empty()
def get_collection(self, path, setup_new_collection=None):
"""Gets a CollectionWrapper for the given path."""
def current(self):
from threading import current_thread
return current_thread() == self._thread
def execute(self, func, args=[], kw={}, waitForReturn=True):
""" Executes a given function on this thread with the *args and **kw.
If 'waitForReturn' is True, then it will block until the function has
executed and return its return value. If False, it will return None
immediately and the function will be executed sometime later.
"""
if waitForReturn:
return_queue = Queue()
else:
return_queue = None
self._queue.put((func, args, kw, return_queue))
if return_queue is not None:
ret = return_queue.get(True)
if isinstance(ret, Exception):
raise ret
return ret
def _run(self):
logging.info('CollectionThread[%s]: Starting...', self.path)
try:
while self._running:
func, args, kw, return_queue = self._queue.get(True)
logging.info('CollectionThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw))
self.last_timestamp = time.time()
try:
ret = func(*args, **kw)
except Exception, e:
logging.error('CollectionThread[%s]: Unable to %s(*%s, **%s): %s',
self.path, func.func_name, repr(args), repr(kw), e, exc_info=True)
# we return the Exception which will be raise'd on the other end
ret = e
if return_queue is not None:
return_queue.put(ret)
except Exception, e:
logging.error('CollectionThread[%s]: Thread crashed! Exception: %s', e, exc_info=True)
finally:
self.wrapper.close()
# clean out old thread object
self._thread = None
# in case we got here via an exception
self._running = False
logging.info('CollectionThread[%s]: Stopped!' % self.path)
def start(self):
if not self._running:
self._running = True
assert self._thread is None
self._thread = Thread(target=self._run)
self._thread.start()
def stop(self):
def _stop():
self._running = False
self.execute(_stop, waitForReturn=False)
def stop_and_wait(self):
""" Tell the thread to stop and wait for it to happen. """
self.stop()
if self._thread is not None:
self._thread.join()
class CollectionThreadPool(object):
def __init__(self, wrapper_class=CollectionWrapper):
self.wrapper_class = wrapper_class
self.threads = {}
self.monitor_frequency = 15
self.monitor_inactivity = 90
monitor = Thread(target=self._monitor_run)
monitor.daemon = True
monitor.start()
self._monitor_thread = monitor
# TODO: it would be awesome to have a safe way to stop inactive threads completely!
# TODO: we need a way to inform other code that the collection has been closed
def _monitor_run(self):
""" Monitors threads for inactivity and closes the collection on them
(leaves the thread itself running -- hopefully waiting peacefully with only a
small memory footprint!) """
while True:
cur = time.time()
for path, thread in self.threads.items():
if thread.running and thread.wrapper.opened() and thread.qempty() and cur - thread.last_timestamp >= self.monitor_inactivity:
logging.info('Monitor is closing collection on inactive CollectionThread[%s]' % thread.path)
def closeCollection(wrapper):
wrapper.close()
thread.execute(closeCollection, [thread.wrapper], waitForReturn=False)
time.sleep(self.monitor_frequency)
def create_thread(self, path):
return CollectionThread(path, wrapper_class=self.wrapper_class)
def start(self, path):
path = os.path.realpath(path)
try:
thread = self.threads[path]
col = self.collections[path]
except KeyError:
thread = self.threads[path] = self.create_thread(path)
col = self.collections[path] = self.collection_wrapper(path, setup_new_collection)
thread.start()
return thread
return col
def shutdown(self):
for thread in self.threads.values():
thread.stop()
self.threads = {}
# TODO: There's got to be a way to do this without initializing it ALWAYS!
thread_pool = CollectionThreadPool()
"""Close all CollectionWrappers managed by this object."""
for path, col in self.collections.items():
del self.collections[path]
col.close()