Changeset 5343


Ignore:
Timestamp:
27/05/09 16:28:35 (10 years ago)
Author:
pjkersha
Message:

SSL Client AuthN WSGI middleware - additions to unit tests.

Location:
TI12-security/trunk/python
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/__init__.py

    r5330 r5343  
    6262    def _initCall(self, environ, start_response): 
    6363        """Call from derived class' __call__() to set environ and path 
    64         attributes""" 
     64        attributes 
     65         
     66        @type environ: dict 
     67        @param environ: WSGI environment variables dictionary 
     68        @type start_response: function 
     69        @param start_response: standard WSGI start response function 
     70        """ 
    6571        self.environ = environ 
    6672        self.start_response = start_response 
     
    8086 
    8187    def __call__(self, environ, start_response): 
     88        """ 
     89        @type environ: dict 
     90        @param environ: WSGI environment variables dictionary 
     91        @type start_response: function 
     92        @param start_response: standard WSGI start response function 
     93        """ 
    8294        self._initCall(environ, start_response) 
    8395        return self._setResponse(environ, start_response) 
    8496     
    8597    def _setResponse(self,  
    86                      environ,  
    87                      start_response,  
     98                     environ=None,  
     99                     start_response=None,  
    88100                     notFoundMsg=None, 
    89101                     notFoundMsgContentType=None): 
     102        """Convenience method to wrap call to next WSGI app in stack or set an 
     103        error if none is set 
     104         
     105        @type environ: dict 
     106        @param environ: WSGI environment variables dictionary defaults to  
     107        environ object attribute.  For the latter to be available, the initCall 
     108        decorator method must have been invoked. 
     109        @type start_response: function 
     110        @param start_response: standard WSGI start response function defaults  
     111        to start_response object attribute.  For the latter to be available,  
     112        the initCall decorator method must have been invoked. 
     113        """ 
     114        if environ is None: 
     115            environ = self.environ 
     116         
     117        if start_response is None: 
     118            start_response = self.start_response 
     119 
    90120        if self._app: 
    91121            return self._app(environ, start_response) 
     
    102132        @type start_response: function 
    103133        @param start_response: standard WSGI callable to set the HTTP header 
     134        defaults to start_response object attribute.  For the latter to be  
     135        available, the initCall decorator method must have been invoked.    
    104136        @type msg: basestring 
    105137        @param msg: optional error message 
  • TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/soap.py

    r5060 r5343  
    160160        return sw 
    161161     
    162     pathMatch = lambda self, environ:environ['PATH_INFO'] == self.app_conf['path'] 
     162    pathMatch = lambda self,environ:environ['PATH_INFO']==self.app_conf['path'] 
    163163         
    164164    @staticmethod 
  • TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/ssl.py

    r5333 r5343  
    1717log = logging.getLogger(__name__) 
    1818import os 
    19 import httplib 
    20  
    21 from ndg.security.server.wsgi import NDGSecurityPathFilter 
     19import re # Pattern matching to determine which URI paths to apply SSL AuthN to 
     20 
     21from ndg.security.server.wsgi import NDGSecurityMiddlewareBase 
    2222from ndg.security.common.X509 import X509Stack, X509Cert, X509CertError, X500DN 
     23from ndg.security.common.utils.classfactory import instantiateClass 
     24 
     25 
     26class ClientCertVerificationInterface(object): 
     27    """Interface to enable customised verification of the client certificate  
     28    Distinguished Name""" 
     29    def __init__(self, **cfg): 
     30        """@type cfg: dict 
     31        @param cfg: configuration parameters, derived class may customise 
     32        """ 
     33        raise NotImplementedError() 
     34     
     35    def __call__(self, x509Cert): 
     36        """Derived class implementation should return True if the certificate 
     37        DN is valid, False otherwise 
     38        @type x509Cert: ndg.security.common.X509.X509Cert 
     39        @param x509Cert: client X.509 certificate received from Apache 
     40        environment""" 
     41        raise NotImplementedError() 
     42 
     43 
     44class NoClientCertVerification(ClientCertVerificationInterface): 
     45    """Implementation of ClientCertVerificationInterface ignoring the  
     46    client certificate DN set""" 
     47    def __init__(self, **cfg): 
     48        pass 
     49     
     50    def __call__(self, x509Cert): 
     51        return True 
     52     
     53class ClientCertVerificationList(ClientCertVerificationInterface): 
     54    """Implementation of ClientCertVerificationInterface matching the input 
     55    client certificate DN against a configurable list""" 
     56     
     57    def __init__(self, validDNList=[]): 
     58        self.validDNList = validDNList 
     59     
     60    def __call__(self, x509Cert): 
     61        inputDN = x509Cert.dn 
     62        return inputDN in self._validDNList 
     63     
     64    def _setValidDNList(self, dnList): 
     65        '''Read CA certificates from file and add them to an X.509 Cert. 
     66        stack 
     67         
     68        @type dnList: list or tuple 
     69        @param dnList: list of DNs to match against the input certificate DN 
     70        ''' 
     71         
     72        if isinstance(dnList, basestring): 
     73            # Try parsing a space separated list of file paths 
     74            dnList = dnList.split() 
     75             
     76        elif not isinstance(dnList, (list, tuple)): 
     77            raise TypeError('Expecting a list or tuple for "dnList"') 
     78 
     79        self._validDNList = [X500DN(dn) for dn in dnList] 
     80 
     81     
     82    def _getValidDNList(self): 
     83        return self._validDNList 
     84     
     85    validDNList = property(fset=_setValidDNList, 
     86                           fget=_getValidDNList, 
     87                           doc="list of permissible certificate Distinguished " 
     88                               "Names permissible") 
     89     
    2390 
    2491class ApacheSSLAuthNMiddleware(NDGSecurityMiddlewareBase): 
     
    44111     
    45112    propertyDefaults = { 
     113        'clientCertVerificationClassName': None, 
     114        'rePathMatchList': [], 
    46115        'caCertFilePathList': [] 
    47116    } 
     
    55124     
    56125    def __init__(self, app, global_conf, prefix='', **app_conf): 
     126         
    57127        super(ApacheSSLAuthNMiddleware, self).__init__(app,  
    58128                                                       global_conf,  
     129                                                       prefix=prefix, 
    59130                                                       **app_conf) 
     131         
     132        self.rePathMatchList = [re.compile(r) for r in  
     133                                app_conf.get('rePathMatchList','').split()] 
     134 
     135        self.caCertFilePathList = app_conf.get('caCertFilePathList', []) 
     136         
     137        # A custom class may be specified to determine what verification to 
     138        # apply to the client certificate 
     139        clientCertVerificationClassName = app_conf.get( 
     140                                    prefix+'clientCertVerificationClassName') 
     141        if clientCertVerificationClassName: 
     142            isClientCertVerificationProperty = lambda i: i[0].startswith( 
     143                                            prefix+'clientCertVerification.') 
     144            clientCertVerificationProperties = \ 
     145                dict(filter(isClientCertVerificationProperty,app_conf.items())) 
     146                 
     147            self._verifyClientCert = instantiateClass( 
     148                             clientCertVerificationClassName,  
     149                             None,  
     150                             objectType=ClientCertVerificationInterface,  
     151                             classProperties=clientCertVerificationProperties)             
     152        else:  
     153            # Default to carry out no verification 
     154            self._verifyClientCert = NoClientCertVerification 
    60155         
    61156         
     
    101196            log.warning("ApacheSSLAuthNMiddleware: 'HTTPS' environment " 
    102197                        "variable not found in environment; ignoring request") 
    103             return self._app(environ, start_response) 
    104              
    105         elif not self.pathMatch: 
     198            return self._setResponse() 
     199             
     200        elif not self._pathMatch(): 
    106201            log.debug("ApacheSSLAuthNMiddleware: ignoring path [%s]",  
    107202                      self.pathInfo) 
    108             return self._setResponse(environ, start_response) 
     203            return self._setResponse() 
    109204                     
    110205        elif not self.isSSLClientCertSet: 
    111206            log.error("ApacheSSLAuthNMiddleware: No SSL Client certificate " 
    112                       "for request to [%s]; ignoring ",  
     207                      "for request to [%s]; setting HTTP 401 Unauthorized",  
    113208                      self.pathInfo) 
    114209            return self._setErrorResponse(code=401, 
     
    116211             
    117212        if self.isValidClientCert():             
    118             return self._setResponse(environ, start_response) 
     213            return self._setResponse() 
    119214        else: 
    120215            return self._setErrorResponse(code=401) 
     
    122217             
    123218    def _setResponse(self,  
    124                      environ,  
    125                      start_response, 
    126219                     notFoundMsg='No application set for ' 
    127                                  'ApacheSSLAuthNMiddleware'): 
    128         return super(ApacheSSLAuthNMiddleware, self)._setResponse(environ,  
    129                                                     start_response, 
    130                                                     notFoundMsg=notFoundMsg) 
    131  
    132     def _setErrorResponse(self,  
    133                           start_response, 
    134                           msg='Invalid SSL client certificate'): 
    135         return super(ApacheSSLAuthNMiddleware, self)._setResponse( 
    136                                                 start_response=start_response, 
    137                                                 msg=msg, 
    138                                                 code=self.errorResponseCode) 
    139  
     220                                 'ApacheSSLAuthNMiddleware', 
     221                     **kw): 
     222        return super(ApacheSSLAuthNMiddleware,  
     223                     self)._setResponse(notFoundMsg=notFoundMsg) 
     224 
     225    def _setErrorResponse(self, msg='Invalid SSL client certificate', **kw): 
     226        return super(ApacheSSLAuthNMiddleware, self)._setErrorResponse(msg=msg, 
     227                                                                       **kw) 
     228 
     229    def _pathMatch(self): 
     230        """Apply a list of regular expression matching patterns to the contents 
     231        of environ['PATH_INFO'], if any match, return True.  This method is 
     232        used to determine whether to apply SSL client authentication 
     233        """ 
     234        path = self.pathInfo 
     235        for regEx in self.rePathMatchList: 
     236            if regEx.match(path): 
     237                return True 
     238             
     239        return False 
     240     
    140241    def isValidClientCert(self): 
    141242        sslClientCert = self.environ[ 
     
    161262         
    162263        # Check certificate Distinguished Name via  
    163         # ClientCertDNVerificationInterface object 
    164         return self._clientCertDNVerify(x509Cert) 
    165  
    166  
    167 class ClientCertDNVerificationInterface(object): 
    168     """Interface to enable customised verification of the client certificate  
    169     Distinguished Name""" 
    170     def __init__(self, **cfg): 
    171         """@type cfg: dict 
    172         @param cfg: configuration parameters, derived class may customise 
    173         """ 
    174         raise NotImplementedError() 
    175      
    176     def __call__(self, x509Cert): 
    177         """Derived class implementation should return True if the certificate 
    178         DN is valid, False otherwise 
    179         @type x509Cert: ndg.security.common.X509.X509Cert 
    180         @param x509Cert: client X.509 certificate received from Apache 
    181         environment""" 
    182         raise NotImplementedError() 
    183  
    184  
    185 class NoClientCertDNVerification(ClientCertDNVerificationInterface): 
    186     """Implementation of ClientCertDNVerificationInterface ignoring the  
    187     client certificate DN set""" 
    188     def __init__(self, **cfg): 
    189         pass 
    190      
    191     def __call__(self, x509Cert): 
    192         return True 
    193      
    194 class ClientCertDNVerificationList(ClientCertDNVerificationInterface): 
    195     """Implementation of ClientCertDNVerificationInterface matching the input 
    196     client certificate DN against a configurable list""" 
    197      
    198     def __init__(self, validDNList=[]): 
    199         self.validDNList = validDNList 
    200      
    201     def __call__(self, x509Cert): 
    202         inputDN = x509Cert.dn 
    203         return inputDN in self._validDNList 
    204      
    205     def _setValidDNList(self, dnList): 
    206         '''Read CA certificates from file and add them to an X.509 Cert. 
    207         stack 
    208          
    209         @type dnList: list or tuple 
    210         @param dnList: list of DNs to match against the input certificate DN 
    211         ''' 
    212          
    213         if isinstance(dnList, basestring): 
    214             # Try parsing a space separated list of file paths 
    215             dnList = dnList.split() 
    216              
    217         elif not isinstance(dnList, (list, tuple)): 
    218             raise TypeError('Expecting a list or tuple for "dnList"') 
    219  
    220         self._validDNList = [X500DN(dn) for dn in dnList] 
    221  
    222      
    223     def _getValidDNList(self): 
    224         return self._validDNList 
    225      
    226     validDNList = property(fset=_setValidDNList, 
    227                            fget=_getValidDNList, 
    228                            doc="list of permissible certificate Distinguished " 
    229                                "Names permissible") 
     264        # ClientCertVerificationInterface object 
     265        return self._clientCertVerify(x509Cert) 
     266 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/unit/__init__.py

    r5291 r5343  
    3434    """ 
    3535    @type path: basestring 
    36     @param path: directory path from which to get parent directoty, defaults 
     36    @param path: directory path from which to get parent directory, defaults 
    3737    to dir of this module 
    3838    @rtype: basestring 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/unit/wsgi/ssl/test.ini

    r5333 r5343  
    55# 
    66[DEFAULT] 
    7  
     7testConfigDir = ../../../config 
    88[server:main] 
    99use = egg:Paste#http 
     
    1515 
    1616[app:TestApp] 
    17 paste.app_factory = ndg.security.test.unit.wsgi.ssl.test_authn:TestAuthNMiddleware 
     17paste.app_factory = ndg.security.test.unit.wsgi.ssl.test_ssl:TestSSLClientAuthNMiddleware 
    1818 
    1919[filter:SSLClientAuthNFilter] 
    2020paste.filter_app_factory = ndg.security.server.wsgi.ssl:ApacheSSLAuthNMiddleware 
    21 prefix = authN. 
    22 authN.redirectURI = /redirect2here 
    23 #authN.redirectURI = http://localhost:5800/verify 
     21prefix = ssl. 
     22caCertFilePathList = %(testConfigDir)s/ca/ndg-test-ca.crt 
     23clientCertVerificationClassName =  
     24rePathMatchList = ^/secured/.*$ ^/restrict.* 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/unit/wsgi/ssl/test_ssl.py

    r5333 r5343  
    1212import logging 
    1313 
    14  
    1514import unittest 
    1615import os 
    17 import sys 
    18 import getpass 
    1916import re 
    20 import base64 
    21 import urllib2 
    22  
    23 from os.path import expandvars as xpdVars 
    24 from os.path import join as jnPath 
    25 mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'],  
    26                              file) 
    2717 
    2818import paste.fixture 
    2919from paste.deploy import loadapp 
     20from ndg.security.test.unit import BaseTestCase 
    3021 
    31 class TestSSLClientAuthNMiddleware(object): 
     22class TestSSLClientAuthNMiddleware(BaseTestCase): 
    3223    '''Test Application for the Authentication handler to protect''' 
    3324    response = "Test Authentication redirect application" 
     
    3829    def __call__(self, environ, start_response): 
    3930         
    40         if environ['PATH_INFO'] == '/test_401WithNotLoggedIn': 
    41             status = "401 Unauthorized" 
     31        if environ['PATH_INFO'] == '/secured/uri': 
     32            status = "200 OK" 
    4233             
    43         elif environ['PATH_INFO'] == '/test_401WithLoggedIn': 
    44             status = "401 Unauthorized" 
     34        elif environ['PATH_INFO'] == '/unsecured': 
     35            status = "200 OK" 
    4536             
    4637        elif environ['PATH_INFO'] == '/test_200WithNotLoggedIn': 
     
    7061         
    7162 
    72     def test01(self): 
    73         response = self.app.get('/test_401WithNotLoggedIn', 
     63    def test01NotAnSSLRequest(self): 
     64        # This request should be ignored because the SSL environment settings 
     65        # are not present 
     66        response = self.app.get('/unsecured') 
     67     
     68    def test02NoClientCertSet(self): 
     69        extra_environ = {'HTTPS':'1'} 
     70        response = self.app.get('/secured/uri', 
     71                                extra_environ=extra_environ, 
     72                                status=401) 
     73     
     74    def test03ClientCertSet(self): 
     75        thisDir = os.dirname(__file__) 
     76        sslClientCertFilePath = os.path.join(BaseTestCase.configDirEnvVarName, 
     77                                             'pki', 
     78                                             'test.crt') 
     79        sslClientCert = str(X509.Read(sslClientCertFilePath)) 
     80        extra_environ = {'HTTPS':'1', 'SSL_CLIENT_CERT': sslClientCert} 
     81        response = self.app.get('/secured/uri', 
     82                                extra_environ=extra_environ, 
    7483                                status=401) 
    7584 
Note: See TracChangeset for help on using the changeset viewer.