Add more test helpers and integration tests for SyncApp's media sync feature using WebTest's TestApp.

Add test helpers for creating, inspecting and manipulating instances of SyncApp and RestApp.
Add subclasses of Anki's RemoteServer and RemoteMediaServer for communicating with the wrapped SyncApp instance under test.
Add helpers for monkey patching Anki's MediaManager and DB for easier testing.
Add test assets directory.
This commit is contained in:
Christoph Mack
2016-02-29 09:56:28 +01:00
committed by flan
parent cb574aa0a7
commit 573aeece81
8 changed files with 692 additions and 0 deletions

View File

@@ -169,3 +169,19 @@ class FileUtils(object):
zip_file.close()
return file_buffer.getvalue()
def get_asset_path(self, relative_file_path):
"""
Retrieves the path of a file for testing from the "assets" directory.
:param relative_file_path: the name of the file to retrieve, relative
to the "assets" directory
:return: the absolute path to the file in the "assets" directory.
"""
join = os.path.join
script_dir = os.path.dirname(os.path.realpath(__file__))
support_dir = join(script_dir, os.pardir, "assets")
res = join(support_dir, relative_file_path)
return res

View File

@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import logging
from anki.sync import HttpSyncer, RemoteServer, RemoteMediaServer, FullSyncer
class MockServerConnection(object):
"""
Mock for HttpSyncer's con attribute, a httplib2 connection. All requests
that would normally got to the remote server will be redirected to our
server_app_to_test object.
"""
def __init__(self, server_app_to_test):
self.test_app = server_app_to_test
def request(self, uri, method='GET', headers=None, body=None):
if method == 'POST':
logging.debug("Posting to URI '{}'.".format(uri))
logging.info("Posting to URI '{}'.".format(uri))
test_response = self.test_app.post(uri,
params=body,
headers=headers,
status="*")
resp = test_response.headers
resp.update({
"status": str(test_response.status_int)
})
cont = test_response.body
return resp, cont
else:
raise Exception('Unexpected HttpSyncer.req() behavior.')
class MockRemoteServer(RemoteServer):
"""
Mock for RemoteServer. All communication to our remote counterpart is
routed to our TestApp object.
"""
def __init__(self, hkey, server_test_app):
# Create a custom connection object we will use to communicate with our
# 'remote' server counterpart.
connection = MockServerConnection(server_test_app)
HttpSyncer.__init__(self, hkey, connection)
def syncURL(self): # Overrides RemoteServer.syncURL().
return "/sync/"
class MockRemoteMediaServer(RemoteMediaServer):
"""
Mock for RemoteMediaServer. All communication to our remote counterpart is
routed to our TestApp object.
"""
def __init__(self, col, hkey, server_test_app):
# Create a custom connection object we will use to communicate with our
# 'remote' server counterpart.
connection = MockServerConnection(server_test_app)
HttpSyncer.__init__(self, hkey, connection)
def syncURL(self): # Overrides RemoteServer.syncURL().
return "/msync/"

View File

@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
import os
import sqlite3 as sqlite
from anki.media import MediaManager
from anki.storage import DB
mediamanager_orig_funcs = {
"findChanges": None,
"mediaChangesZip": None,
"addFilesFromZip": None,
"syncDelete": None
}
db_orig_funcs = {
"__init__": None
}
def monkeypatch_mediamanager():
"""
Monkey patches anki.media.MediaManager's methods so they chdir to
self.dir() before acting on its media directory and chdir back to the
original cwd after finishing.
"""
def make_cwd_safe(original_func):
mediamanager_orig_funcs["findChanges"] = MediaManager.findChanges
mediamanager_orig_funcs["mediaChangesZip"] = MediaManager.mediaChangesZip
mediamanager_orig_funcs["addFilesFromZip"] = MediaManager.addFilesFromZip
mediamanager_orig_funcs["syncDelete"] = MediaManager.syncDelete
def wrapper(instance, *args):
old_cwd = os.getcwd()
os.chdir(instance.dir())
res = original_func(instance, *args)
os.chdir(old_cwd)
return res
return wrapper
MediaManager.findChanges = make_cwd_safe(MediaManager.findChanges)
MediaManager.mediaChangesZip = make_cwd_safe(MediaManager.mediaChangesZip)
MediaManager.addFilesFromZip = make_cwd_safe(MediaManager.addFilesFromZip)
MediaManager.syncDelete = make_cwd_safe(MediaManager.syncDelete)
def unpatch_mediamanager():
"""Undoes monkey patches to Anki's MediaManager."""
MediaManager.findChanges = mediamanager_orig_funcs["findChanges"]
MediaManager.mediaChangesZip = mediamanager_orig_funcs["mediaChangesZip"]
MediaManager.addFilesFromZip = mediamanager_orig_funcs["addFilesFromZip"]
MediaManager.syncDelete = mediamanager_orig_funcs["syncDelete"]
mediamanager_orig_funcs["findChanges"] = None
mediamanager_orig_funcs["mediaChangesZip"] = None
mediamanager_orig_funcs["mediaChangesZip"] = None
mediamanager_orig_funcs["mediaChangesZip"] = None
def monkeypatch_db():
"""
Monkey patches Anki's DB.__init__ to connect to allow access to the db
connection from more than one thread, so that we can inspect and modify
the db created in the app in our test code.
"""
db_orig_funcs["__init__"] = DB.__init__
def patched___init__(self, path, text=None, timeout=0):
# Code taken from Anki's DB.__init__()
encpath = path
if isinstance(encpath, unicode):
encpath = path.encode("utf-8")
# Allow more than one thread to use this connection.
self._db = sqlite.connect(encpath,
timeout=timeout,
check_same_thread=False)
if text:
self._db.text_factory = text
self._path = path
self.echo = os.environ.get("DBECHO") # echo db modifications
self.mod = False # flag that db has been modified?
DB.__init__ = patched___init__
def unpatch_db():
"""Undoes monkey patches to Anki's DB."""
DB.__init__ = db_orig_funcs["__init__"]
db_orig_funcs["__init__"] = None

View File

@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
import filecmp
import logging
import os
import ConfigParser
import shutil
from ankisyncd.sync_app import SyncApp, SyncCollectionHandler, SyncMediaHandler
from helpers.file_utils import FileUtils
class ServerUtils(object):
def __init__(self):
self.fileutils = FileUtils()
def clean_up(self):
self.fileutils.clean_up()
def create_server_paths(self):
"""
Creates temporary files and dirs for our app to use during tests.
"""
auth = self.fileutils.create_file_path(suffix='.db',
prefix='ankiserver_auth_db_')
session = self.fileutils.create_file_path(suffix='.db',
prefix='ankiserver_session_db_')
data = self.fileutils.create_dir(suffix='',
prefix='ankiserver_data_root_')
return {
"auth_db": auth,
"session_db": session,
"data_root": data
}
@staticmethod
def create_sync_app(server_paths, config_path):
config = ConfigParser.SafeConfigParser()
config.read(config_path)
# Use custom files and dirs in settings.
config.set("sync_app", "auth_db_path", server_paths["auth_db"])
config.set("sync_app", "session_db_path", server_paths["session_db"])
config.set("sync_app", "data_root", server_paths["data_root"])
return SyncApp(config)
def get_session_for_hkey(self, server, hkey):
return server.session_manager.load(hkey)
def get_thread_for_hkey(self, server, hkey):
session = self.get_session_for_hkey(server, hkey)
thread = session.get_thread()
return thread
def get_col_wrapper_for_hkey(self, server, hkey):
print("getting col wrapper for hkey " + hkey)
print("all session keys: " + str(server.session_manager.sessions.keys()))
thread = self.get_thread_for_hkey(server, hkey)
col_wrapper = thread.wrapper
return col_wrapper
def get_col_for_hkey(self, server, hkey):
col_wrapper = self.get_col_wrapper_for_hkey(server, hkey)
col_wrapper.open() # Make sure the col is opened.
return col_wrapper._CollectionWrapper__col
def get_col_db_path_for_hkey(self, server, hkey):
col = self.get_col_for_hkey(server, hkey)
return col.db._path
def get_syncer_for_hkey(self, server, hkey, syncer_type='collection'):
col = self.get_col_for_hkey(server, hkey)
session = self.get_session_for_hkey(server, hkey)
syncer_type = syncer_type.lower()
if syncer_type == 'collection':
handler_method = SyncCollectionHandler.operations[0]
elif syncer_type == 'media':
handler_method = SyncMediaHandler.operations[0]
return session.get_handler_for_operation(handler_method, col)
def add_files_to_mediasyncer(self,
media_syncer,
filepaths,
update_db=False,
bump_last_usn=False):
"""
If bumpLastUsn is True, the media syncer's lastUsn will be incremented
once for each added file. Use this when adding files to the server.
"""
for filepath in filepaths:
logging.debug("Adding file '{}' to mediaSyncer".format(filepath))
# Import file into media dir.
media_syncer.col.media.addFile(filepath)
if bump_last_usn:
# Need to bump lastUsn once for each file.
media_manager = media_syncer.col.media
media_manager.setLastUsn(media_syncer.col.media.lastUsn() + 1)
if update_db:
media_syncer.col.media.findChanges() # Write changes to db.