source: TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/sslclientauthn.py @ 4606

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/sslclientauthn.py@4606
Revision 4606, 9.0 KB checked in by pjkersha, 12 years ago (diff)

#1004 Security Filter:

  • SSLClientAuthNMiddleware initial version near completion
Line 
1"""SSL Client Authentication Middleware
2
3Apply to SSL client authentication to configured URL paths.
4
5SSL Client certificate is expected to be present in environ as SSL_CLIENT_CERT
6key as set by standard Apache SSL.
7
8NERC Data Grid Project
9
10This software may be distributed under the terms of the Q Public License,
11version 1.0 or later.
12"""
13__author__ = "P J Kershaw"
14__date__ = "11/12/08"
15__copyright__ = "(C) 2008 STFC & NERC"
16__contact__ = "Philip.Kershaw@stfc.ac.uk"
17__revision__ = "$Id$"
18import logging
19log = logging.getLogger(__name__)
20import httplib
21from ndg.security.common.X509 import X509Cert, X509CertError
22
23class SSLClientAuthNMiddleware(object):
24
25    sslClientCertKeyName = 'SSL_CLIENT_CERT'
26   
27    propertyDefaults = {
28        'errorResponseCode': 401,
29        'pathMatchList': '/',
30        'caCertFilePathList': []
31    }
32
33    _isSSLClientCertSet = lambda self: \
34        SSLClientAuthNMiddleware.sslClientCertKeyName in self._environ
35    isSSLClientCertSet = property(fget=_isSSLClientCertSet,
36                                  doc="Check for client cert. set in environ")
37   
38    _pathMatch = lambda self: self._path in self.pathMatchList
39    pathMatch = property(fget=_pathMatch,
40                         doc="Check for input path match to list of paths"
41                             "to which SSL client AuthN is to be applied")
42   
43    def __init__(self, app, app_conf, prefix='', **local_conf):
44        self._app = app
45
46        opt = SSLClientAuthNMiddleware.propertyDefaults.copy()
47        if app_conf is not None:
48            # Update from application config dictionary - filter from using
49            # prefix
50            SSLClientAuthNMiddleware._filterOpts(opt, app_conf, prefix=prefix)
51                       
52        # Similarly, filter keyword input                 
53        SSLClientAuthNMiddleware._filterOpts(opt, kw, prefix=prefix)
54       
55        # Update options from keywords - matching app_conf ones will be
56        # overwritten
57        opt.update(local_conf)
58       
59        # Set options as object attributes
60        for name, val in opt.items():
61            setattr(self, name, val)
62   
63    def _getErrorResponseCode(self):
64        """
65        @rtype: int
66        @return: HTTP error code set by this middleware on client cert.
67        verification error
68        """
69        return self._errorResponseCode
70           
71    def _setErrorResponseCode(self, code):
72        """
73        @type code: int or basestring
74        @param code: error response code set if client cert. verification
75        fails"""
76        if isinstance(code, int):
77            self._errorResponseCode = code
78        elif isinstance(code, basestring):
79            self._errorResponseCode = int(code)
80        else:
81            raise TypeError('Expecting int or string type for '
82                            '"errorResponseCode" attribute')
83           
84        if self._errorResponseCode not in httplib.responses: 
85            raise ValueError("Error response code [%d] is not recognised "
86                             "standard HTTP response code" % 
87                             self._errorResponseCode) 
88           
89    errorResponseCode = property(fget=_getErrorResponseCode,
90                            fset=_setErrorResponseCode,
91                            doc="Response code raised if client certificate "
92                                "verification fails")
93       
94    def _setCACertsFromFileList(self, caCertFilePathList):
95        '''Read CA certificates from file and add them to an X.509 Cert.
96        stack
97       
98        @type caCertFilePathList: list or tuple
99        @param caCertFilePathList: list of file paths for CA certificates to
100        be used to verify certificate used to sign message'''
101       
102        if isinstance(caCertFilePathList, basestring):
103            # Try parsing a space separated list of file paths
104            caCertFilePathList = caCertFilePathList.split()
105           
106        elif not isinstance(caCertFilePathList, (list, tuple)):
107            raise TypeError('Expecting a list or tuple for '
108                            '"caCertFilePathList"')
109
110        self._caCertStack = X509Stack()
111
112        for caCertFilePath in caCertFilePathList:
113            self._caCertStack.push(X509.load_cert(caCertFilePath))
114       
115    caCertFilePathList = property(fset=_setCACertsFromFileList,
116                                  doc="list of CA certificate file paths - "
117                                      "peer certificate must validate against "
118                                      "one")
119   
120    def _setPathMatchList(self, pathList):
121        '''Read CA certificates from file and add them to an X.509 Cert.
122        stack
123       
124        @type pathList: list or tuple
125        @param pathList: list of URL paths to apply SSL client authentication
126        to. Paths are relative to the point at which this middleware is mounted
127        as set in environ['PATH_INFO']
128        '''
129        # TODO: refactor to:
130        # * enable reading of path list from a database or some other
131        # configuration source.
132        # * enable some kind of pattern matching for paths
133       
134        if isinstance(pathList, basestring):
135            # Try parsing a space separated list of file paths
136            pathList = pathList.split()
137           
138        elif not isinstance(pathList, (list, tuple)):
139            raise TypeError('Expecting a list or tuple for "pathMatchList"')
140   
141    @classmethod
142    def _filterOpts(cls, opt, newOpt, prefix=''):
143        '''Convenience utility to filter input options set in __init__ via
144        app_conf or keywords
145       
146        @type opt: dict
147        @param opt: existing options set.  These will be updated by this
148        method based on the content of newOpt
149        @type newOpt: dict
150        @param newOpt: new options to update opt with
151        @type prefix: basestring
152        @param prefix: if set, remove the given prefix from the input options
153        @raise KeyError: if an option is set that is not in the classes
154        defOpt class variable
155        '''
156       
157        badOpt = []
158        for k,v in newOpt.items():
159            if prefix and k.startswith(prefix):
160                subK = k.replace(prefix, '')                   
161                filtK = '_'.join(subK.split('.')) 
162            else:
163                filtK = k
164                   
165            if filtK not in cls.propertyDefaults:
166                badOpt += [k]               
167            else:
168                opt[filtK] = v
169               
170        if len(badOpt) > 0:
171            raise TypeError("Invalid input option(s) set: %s" % 
172                            (", ".join(badOpt)))
173               
174    def __call__(self, environ, start_response):
175       
176        self._path = environ.get('PATH_INFO').rstrip('/')
177       
178        if not self.pathMatch:
179            return self._setResponse()
180       
181        self._environ = environ
182       
183        if not self.sslClientCertSet:
184            return self._setErrorResponse()
185           
186        if self.isValidClientCert():           
187            return self._setResponse()
188        else:
189            return self._setErrorResponse()
190           
191    def _setResponse(self):
192        if self._app:
193            return self._app(environ, start_response)
194        else:
195            response = 'No application set for SSLClientAuthNMiddleware'
196            status = '%d %s' % (404, httplib.responses[404])
197            start_response(status,
198                           [('Content-type', 'text/plain'),
199                            ('Content-Length', str(len(response)))])
200            return response
201
202    def _setErrorResponse(self, msg='Invalid SSL client certificate'):
203        response = msg
204        status = '%d %s' % (self.errorResponseCode, 
205                            httplib.responses[self.errorResponseCode])
206       
207        start_response(status,
208                       [('Content-type', 'text/plain'),
209                        ('Content-Length', str(len(response)))])
210        return response
211
212    def isValidClientCert(self):
213        x509Cert = X509Cert.Parse(filePath)
214       
215        if len(self._caCertStack) == 0:
216            log.warning("No CA certificates set for Client certificate "
217                        "signature verification")
218        else:
219            try:
220                self._caCertStack.verifyCertChain(x509Cert2Verify=x509Cert)
221
222            except X509CertError, e:
223                log.info("Client certificate verification failed: %s" % e)
224                return False
225           
226            except Exception, e:
227                log.error("Client certificate verification failed with "
228                          "unexpected error: %s" % e)
229                return False
230           
231        return True
232       
233
234# Utility functions to support Paste Deploy application and filter function
235# signatures       
236def filter_app_factory(app, app_conf, **local_conf):
237    '''Wrapper to SSLClientAuthNMiddleware for Paste Deploy filter'''
238    return SSLClientAuthNMiddleware(app, app_conf, **local_conf)
239   
240def app_factory(app_conf, **local_conf):
241    '''Wrapper to SSLClientAuthNMiddleware for Paste Deploy app'''
242    return SSLClientAuthNMiddleware(None, app_conf, **local_conf)
Note: See TracBrowser for help on using the repository browser.