source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/__init__.py @ 5779

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/__init__.py@5779
Revision 5779, 1.8 KB checked in by pjkersha, 11 years ago (diff)

Integrated automated start-up and shutdown of Paste http servers for unit tests.

RevLine 
[5015]1"""WSGI unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "23/02/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
[5774]11
12import paste.httpserver
13from threading import Thread
14from paste.deploy import loadapp
15from paste.script.util.logging_config import fileConfig
16
17import logging
18logging.basicConfig(level=logging.DEBUG)
19
20class PasteDeployAppServer(object):
21    """Wrapper to paste.httpserver to enable background threading"""
22   
[5779]23    def __init__(self, app=None, cfgFilePath=None, port=7443, host='0.0.0.0'):
[5774]24        """Load an application configuration from cfgFilePath ini file and
25        instantiate Paste server object
26        """       
27        self.__thread = None
[5779]28       
29        if cfgFilePath:
30            fileConfig(cfgFilePath)
31            app = loadapp('config:%s' % cfgFilePath)
32           
33        elif app is None:
34            raise KeyError('Either the "cfgFilePath" or "app" keyword must be '
35                           'set')
36           
[5774]37        self.__pasteServer = paste.httpserver.serve(app, host=host, port=port, 
38                                                    start_loop=False)
39   
40    def _getPasteServer(self):
41        return self.__pasteServer
42   
43    pasteServer = property(fget=_getPasteServer)
44   
45    def _getThread(self):
46        return self.__thread
47   
48    thread = property(fget=_getThread)
49   
50    def start(self):
51        """Start server"""
52        self.pasteServer.serve_forever()
53       
54    def startThread(self):
55        """Start server in a separate thread"""
56        self.__thread = Thread(target=PasteDeployAppServer.start, args=(self,))
57        self.thread.start()
58       
59    def terminateThread(self):
60        self.pasteServer.server_close()
61
Note: See TracBrowser for help on using the repository browser.