source: TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/__init__.py @ 5290

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/__init__.py@5290
Revision 5290, 14.4 KB checked in by pjkersha, 10 years ago (diff)

Reworking unit tests

Line 
1"""WSGI Middleware components
2
3NERC Data Grid Project"""
4__author__ = "P J Kershaw"
5__date__ = "27/05/08"
6__copyright__ = "(C) 2009 Science and Technology Facilities Council"
7__license__ = "BSD - see LICENSE file in top-level directory"
8__contact__ = "Philip.Kershaw@stfc.ac.uk"
9__revision__ = '$Id$'
10import logging
11log = logging.getLogger(__name__)
12import httplib
13
14class NDGSecurityMiddlewareError(Exception):
15    '''Base exception class for NDG Security middleware'''
16   
17class NDGSecurityMiddlewareConfigError(NDGSecurityMiddlewareError):
18    '''NDG Security Middleware Configuration error'''
19   
20class NDGSecurityMiddlewareBase(object):
21    """Base class for NDG Security Middleware classes"""
22    propertyDefaults = {
23        'mountPath': '/',
24    }
25   
26    def __init__(self, app, app_conf, prefix='', **local_conf):
27        '''
28        @type app: callable following WSGI interface
29        @param app: next middleware application in the chain     
30        @type app_conf: dict       
31        @param app_conf: PasteDeploy global configuration dictionary
32        @type prefix: basestring
33        @param prefix: prefix for app_conf parameters e.g. 'ndgsecurity.' -
34        enables other global configuration parameters to be filtered out
35        @type local_conf: dict       
36        @param local_conf: PasteDeploy application specific configuration
37        dictionary
38        '''
39        self._app = app
40        self._environ = {}
41        self._start_response = None
42        self._pathInfo = None
43        self._path = None
44        self._mountPath = '/'
45       
46        opt = self.__class__.propertyDefaults.copy()
47       
48        # If no prefix is set, there is no way to distinguish options set for
49        # this app and those applying to other applications
50        if app_conf is not None and prefix:
51            # Update from application config dictionary - filter using prefix
52            self.__class__._filterOpts(opt, app_conf, prefix=prefix)
53                       
54        # Similarly, filter keyword input                 
55        self.__class__._filterOpts(opt, local_conf, prefix=prefix)
56       
57        # Set options as object attributes
58        for name, val in opt.items():
59            if not name.startswith('_'):
60                setattr(self, name, val)
61
62    def _initCall(self, environ, start_response):
63        """Call from derived class' __call__() to set environ and path
64        attributes"""
65        self.environ = environ
66        self.start_response = start_response
67        self.setPathInfo()
68        self.setPath()
69
70    @staticmethod
71    def initCall(__call__):
72        '''Decorator to __call__ to enable convenient attribute initialisation
73        '''
74        def __call__wrapper(self, environ, start_response):
75            self._initCall(environ, start_response)
76            return __call__(self, environ, start_response)
77
78        return __call__wrapper
79
80
81    def __call__(self, environ, start_response):
82        self._initCall(environ, start_response)
83        return self._setResponse(environ, start_response)
84   
85    def _setResponse(self, 
86                     environ, 
87                     start_response, 
88                     notFoundMsg=None,
89                     notFoundMsgContentType=None):
90        if self._app:
91            return self._app(environ, start_response)
92        else:
93            return self._setErrorResponse(start_response=start_response, 
94                                          msg=notFoundMsg,
95                                          code=404,
96                                          contentType=notFoundMsgContentType)
97           
98    def _setErrorResponse(self, start_response=None, msg=None, 
99                          code=500, contentType=None):
100        '''Convenience method to set a simple error response
101       
102        @type start_response: function
103        @param start_response: standard WSGI callable to set the HTTP header
104        @type msg: basestring
105        @param msg: optional error message
106        @type code: int
107        @param code: standard HTTP error response code
108        @type contentType: basestring
109        @param contentType: set 'Content-type' HTTP header field - defaults to
110        'text/plain'
111        '''           
112        if start_response is None:
113            start_response = self.start_response
114           
115        status = '%d %s' % (code, httplib.responses[code])
116        if msg is None:
117            response = status
118        else:
119            response = msg
120       
121        if contentType is None:
122            contentType = 'text/plain'
123               
124        start_response(status,
125                       [('Content-type', contentType),
126                        ('Content-Length', str(len(response)))])
127        return [response]
128       
129    @staticmethod
130    def getStatusMessage(statusCode):
131        '''Make a standard status message for use with start_response
132        @type statusCode: int
133        @param statusCode: HTTP status code
134        @rtype: str
135        @return: status code with standard message
136        @raise KeyError: for invalid status code
137        '''
138        return '%d %s' % (statusCode, httplib.responses[statusCode])
139   
140    # Utility functions to support Paste Deploy application and filter function
141    # signatures
142    @classmethod       
143    def filter_app_factory(cls, app, app_conf, **local_conf):
144        '''Function signature for Paste Deploy filter'''
145        return cls(app, app_conf, **local_conf)
146       
147    @classmethod
148    def app_factory(cls, app_conf, **local_conf):
149        '''Function Signature for Paste Deploy app'''
150        return cls(None, app_conf, **local_conf)
151   
152    @classmethod
153    def _filterOpts(cls, opt, newOpt, prefix='', propertyDefaults=None):
154        '''Convenience utility to filter input options set in __init__ via
155        app_conf or keywords
156       
157        @type opt: dict
158        @param opt: existing options set.  These will be updated by this
159        method based on the content of newOpt
160        @type newOpt: dict
161        @param newOpt: new options to update opt with
162        @type prefix: basestring
163        @param prefix: if set, remove the given prefix from the input options
164        @raise KeyError: if an option is set that is not in the classes
165        defOpt class variable
166        '''
167        if propertyDefaults is None:
168            propertyDefaults = cls.propertyDefaults
169           
170        badOpt = []
171        for k,v in newOpt.items():
172            if prefix and k.startswith(prefix):
173                subK = k.replace(prefix, '')                   
174                filtK = '_'.join(subK.split('.')) 
175            else:
176                #filtK = k
177                continue
178                   
179            if filtK not in propertyDefaults:
180                badOpt += [k]               
181            else:
182                opt[filtK] = v
183               
184        if len(badOpt) > 0:
185            raise TypeError("Invalid input option(s) set: %s" % 
186                            (", ".join(badOpt)))
187
188    def setMountPath(self, mountPath=None, environ=None):
189        if mountPath:
190            self._mountPath = mountPath
191        else:
192            if environ is None:
193                environ = self._environ
194           
195            self._mountPath = environ.get('SCRIPT_URL')
196            if self._mountPath is None:
197                raise AttributeError("SCRIPT_URL key not set in environ: "
198                                     "'mountPath' is set to None")
199           
200        if self._mountPath != '/':
201            self._mountPath = self._mountPath.rstrip('/')
202       
203    def _getMountPath(self):
204        return self._mountPath
205   
206    mountPath = property(fget=_getMountPath,
207                        fset=setMountPath,
208                        doc="URL path as assigned to SCRIPT_URL environ key")
209
210    def setPathInfo(self, pathInfo=None, environ=None):
211        if pathInfo:
212            self._pathInfo = pathInfo
213        else:
214            if environ is None:
215                environ = self._environ
216           
217            self._pathInfo = environ['PATH_INFO']
218           
219        if self._pathInfo != '/':
220            self._pathInfo = self._pathInfo.rstrip('/')
221       
222    def _getPathInfo(self):
223        return self._pathInfo
224   
225    pathInfo = property(fget=_getPathInfo,
226                        fset=setPathInfo,
227                        doc="URL path as assigned to PATH_INFO environ key")
228
229
230    def setPath(self, path=None):
231        if path:
232            self._path = path
233        else:
234            self._path = self.mountPath.rstrip('/') + self._pathInfo
235           
236        if self._path != '/':
237            self._path = self._path.rstrip('/')
238       
239    def _getPath(self):
240        return self._path
241   
242    path = property(fget=_getPath,
243                        fset=setPath,
244                        doc="Full URL path minus domain name - equivalent to "
245                            "self.mountPath PATH_INFO environ setting")
246
247    def _setEnviron(self, environ):
248        self._environ = environ
249       
250    def _getEnviron(self):
251        return self._environ
252   
253    environ = property(fget=_getEnviron,
254                       fset=_setEnviron,
255                       doc="Reference to WSGI environ dict")
256   
257    def _setStart_response(self, start_response):
258        self._start_response = start_response
259       
260    def _getStart_response(self):
261        return self._start_response
262   
263    start_response = property(fget=_getStart_response,
264                              fset=_setStart_response,
265                              doc="Reference to WSGI start_response function")
266       
267       
268    def _redirect(self, url, start_response=None):
269        """Do a HTTP 302 redirect
270       
271        @type start_response: callable following WSGI start_response convention
272        @param start_response: WSGI start response callable
273        @type url: basestring
274        @param url: URL to redirect to
275        @rtype: list
276        @return: empty HTML body
277        """
278        if start_response is None:
279            # self.start_response will be None if initCall decorator wasn't
280            # applied to __call__
281            if self.start_response is None:
282                raise NDGSecurityMiddlewareConfigError("No start_response "
283                                                       "function set.")
284            start_response = self.start_response
285           
286        start_response(NDGSecurityMiddlewareBase.getStatusMessage(302), 
287                       [('Content-type', 'text/html'),
288                        ('Content-length', '0'),
289                        ('Location', url)])
290        return []
291   
292   
293class NDGSecurityPathFilter(NDGSecurityMiddlewareBase):
294    """Specialisation of NDG Security Middleware to enable filtering based on
295    PATH_INFO
296   
297    B{This class must be run under Apache mod_wsgi}
298
299    - Apache SSLOptions directive StdEnvVars option must be set
300    """
301    propertyDefaults = {
302        'errorResponseCode': 401,
303        'serverName': None,
304        'pathMatchList': ['/']
305    }
306    propertyDefaults.update(NDGSecurityMiddlewareBase.propertyDefaults)
307   
308    _pathMatch = lambda self: self._pathInfo in self.pathMatchList
309    pathMatch = property(fget=_pathMatch,
310                         doc="Check for input path match to list of paths"
311                             "to which this middleware is to be applied")
312   
313    sslKeyName = 'HTTPS'
314
315    _isSSLRequest = lambda self: self.environ.get(
316                                    NDGSecurityPathFilter.sslKeyName) == '1'
317    isSSLRequest = property(fget=_isSSLRequest,
318                            doc="Is an SSL request boolean "
319                                "- depends on Apache config")
320   
321    def __init__(self, *arg, **kw):
322        '''See NDGSecurityMiddlewareBase for explanation of args
323        @type arg: tuple
324        @param arg: single element contains next middleware application in the
325        chain and app_conf dict     
326        @type kw: dict       
327        @param kw: prefix for app_conf parameters and local_conf dict       
328        '''
329        super(NDGSecurityPathFilter, self).__init__(*arg, **kw)
330       
331    def _getPathMatchList(self):
332        return self._pathMatchList
333   
334    def _setPathMatchList(self, pathList):
335        '''
336        @type pathList: list or tuple
337        @param pathList: list of URL paths to apply this middleware
338        to. Paths are relative to the point at which this middleware is mounted
339        as set in environ['PATH_INFO']
340        '''
341        # TODO: refactor to:
342        # * enable reading of path list from a database or some other
343        # configuration source.
344        # * enable some kind of pattern matching for paths
345       
346        if isinstance(pathList, basestring):
347            # Try parsing a space separated list of file paths
348             self._pathMatchList=[path.strip() for path in pathList.split(',')]
349           
350        elif not isinstance(pathList, (list, tuple)):
351            raise TypeError('Expecting a list or tuple for "pathMatchList"')
352        else:
353            self._pathMatchList = pathList
354           
355    pathMatchList = property(fget=_getPathMatchList,
356                             fset=_setPathMatchList,
357                             doc='List of URL paths to which to apply SSL '
358                                 'client authentication')
359       
360    def _getErrorResponseCode(self):
361        """Error response code getter
362        @rtype: int
363        @return: HTTP error code set by this middleware on client cert.
364        verification error
365        """
366        return self._errorResponseCode
367           
368    def _setErrorResponseCode(self, code):
369        """Error response code setter
370        @type code: int or basestring
371        @param code: error response code set if client cert. verification
372        fails"""
373        if isinstance(code, int):
374            self._errorResponseCode = code
375        elif isinstance(code, basestring):
376            self._errorResponseCode = int(code)
377        else:
378            raise TypeError('Expecting int or string type for '
379                            '"errorResponseCode" attribute')
380           
381        if self._errorResponseCode not in httplib.responses: 
382            raise ValueError("Error response code [%d] is not recognised "
383                             "standard HTTP response code" % 
384                             self._errorResponseCode) 
385           
386    errorResponseCode = property(fget=_getErrorResponseCode,
387                                 fset=_setErrorResponseCode,
388                                 doc="Response code raised if client "
389                                     "certificate verification fails")
Note: See TracBrowser for help on using the repository browser.