source: TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/sslclientauthn.py @ 4603

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/sslclientauthn.py@4603
Revision 4603, 6.4 KB checked in by pjkersha, 12 years ago (diff)

#1004: initial work on Security Filter:

  • started new SSLClientAuthNMiddleware WSGI to filter configured URL paths applying SSL client authentication. Gets SSL client certificate from Apache SSL_CLIENT_CERT environment variable and so to work it requires deployment in mod_wsgi with SSL in order to pick up environ.
  • AppLoaderMiddleware?: convenience utility enables mod_wsgi application to parse a Paste Deploy style ini file containing an app and app pipeline as required.
Line 
1import logging
2log = logging.getLogger(__name__)
3from ndg.security.common.X509 import X509Cert
4
5class SSLClientAuthNMiddleware(object):
6
7    sslClientCertKeyName = 'SSL_CLIENT_CERT'
8   
9    propertyDefaults = {
10        'errorResponseCode': 401,
11        'pathMatchList': '/',
12        'caCertFilePathList': []
13    }
14
15    _isSSLClientCertSet = lambda self: \
16        SSLClientAuthNMiddleware.sslClientCertKeyName in self._environ
17    isSSLClientCertSet = property(fget=_isSSLClientCertSet,
18                                  doc="Check for client cert. set in environ")
19   
20    def __init__(self, app, app_conf, prefix='', **local_conf):
21        self._app = app
22
23        opt = SSLClientAuthNMiddleware.propertyDefaults.copy()
24        if app_conf is not None:
25            # Update from application config dictionary - filter from using
26            # prefix
27            SSLClientAuthNMiddleware._filterOpts(opt, app_conf, prefix=prefix)
28                       
29        # Similarly, filter keyword input                 
30        SSLClientAuthNMiddleware._filterOpts(opt, kw, prefix=prefix)
31       
32        # Update options from keywords - matching app_conf ones will be
33        # overwritten
34        opt.update(local_conf)
35       
36        # Set options as object attributes
37        for name, val in opt.items():
38            setattr(self, name, val)
39   
40    def _getErrorResponseCode(self):
41        """
42        @rtype: int
43        @return: HTTP error code set by this middleware on client cert.
44        verification error
45        """
46        return self._errorResponseCode
47           
48    def _setErrorResponseCode(self, code):
49        """
50        @type code: int or basestring
51        @param code: error response code set if client cert. verification
52        fails"""
53        if isinstance(code, int):
54            self._errorResponseCode = code
55        elif isinstance(code, basestring):
56            self._errorResponseCode = int(code)
57        else:
58            raise TypeError('Expecting int or string type for '
59                            '"errorResponseCode" attribute')
60           
61    errorResponseCode = property(fget=_getErrorResponseCode,
62                            fset=_setErrorResponseCode,
63                            doc="Response code raised if client certificate "
64                                "verification fails")
65       
66    def _setCACertsFromFileList(self, caCertFilePathList):
67        '''Read CA certificates from file and add them to an X.509 Cert.
68        stack
69       
70        @type caCertFilePathList: list or tuple
71        @param caCertFilePathList: list of file paths for CA certificates to
72        be used to verify certificate used to sign message'''
73       
74        if isinstance(caCertFilePathList, basestring):
75            # Try parsing a space separated list of file paths
76            caCertFilePathList = caCertFilePathList.split()
77           
78        elif not isinstance(caCertFilePathList, (list, tuple)):
79            raise TypeError('Expecting a list or tuple for '
80                            '"caCertFilePathList"')
81
82        self._caCertStack = X509Stack()
83
84        for caCertFilePath in caCertFilePathList:
85            self._caCertStack.push(X509.load_cert(caCertFilePath))
86       
87    caCertFilePathList = property(fset=_setCACertsFromFileList,
88                                  doc="list of CA certificate file paths - "
89                                      "peer certificate must validate against "
90                                      "one")
91   
92    def _setPathMatchList(self, pathList):
93        '''Read CA certificates from file and add them to an X.509 Cert.
94        stack
95       
96        @type pathList: list or tuple
97        @param pathList: list of URL paths to apply SSL client authentication
98        to. Paths are relative to the point at which this middleware is mounted
99        as set in environ['PATH_INFO']
100        '''
101        # TODO: refactor to:
102        # * enable reading of path list from a database or some other
103        # configuration source.
104        # * enable some kind of pattern matching for paths
105       
106        if isinstance(pathList, basestring):
107            # Try parsing a space separated list of file paths
108            pathList = pathList.split()
109           
110        elif not isinstance(pathList, (list, tuple)):
111            raise TypeError('Expecting a list or tuple for "pathMatchList"')
112   
113    @classmethod
114    def _filterOpts(cls, opt, newOpt, prefix=''):
115        '''Convenience utility to filter input options set in __init__ via
116        app_conf or keywords
117       
118        @type opt: dict
119        @param opt: existing options set.  These will be updated by this
120        method based on the content of newOpt
121        @type newOpt: dict
122        @param newOpt: new options to update opt with
123        @type prefix: basestring
124        @param prefix: if set, remove the given prefix from the input options
125        @raise KeyError: if an option is set that is not in the classes
126        defOpt class variable
127        '''
128       
129        badOpt = []
130        for k,v in newOpt.items():
131            if prefix and k.startswith(prefix):
132                subK = k.replace(prefix, '')                   
133                filtK = '_'.join(subK.split('.')) 
134            else:
135                filtK = k
136                   
137            if filtK not in cls.propertyDefaults:
138                badOpt += [k]               
139            else:
140                opt[filtK] = v
141               
142        if len(badOpt) > 0:
143            raise TypeError("Invalid input option(s) set: %s" % 
144                            (", ".join(badOpt)))
145               
146    def __call__(self, environ, start_response):
147        self._environ = environ
148        self._path = environ.get('PATH_INFO').rstrip('/')
149       
150        if self.sslClientCertSet:
151            self.verifyClientCert()
152           
153        if self._app:
154            return self._app(environ, start_response)
155        else:
156            response = 'No app set for SSLClientAuthNMiddleware'
157            start_response('200 OK',
158                           [('Content-type', 'text/plain'),
159                            ('Content-Length', str(len(response)))])
160            return response
161                 
162    def verifyClientCert(self):
163        x509Cert = X509Cert.Parse(filePath)
164       
165       
166def filter_app_factory(app, app_conf, **local_conf):
167    return SSLClientAuthNMiddleware(app, app_conf, **local_conf)
168   
169def app_factory(app_conf, **local_conf):
170    return SSLClientAuthNMiddleware(None, app_conf, **local_conf)
Note: See TracBrowser for help on using the repository browser.