Changeset 4293


Ignore:
Timestamp:
07/10/08 13:24:45 (11 years ago)
Author:
pjkersha
Message:

Refactoring of CredWallet?

  • added tests for getting mapped AC
  • unit tests now complete
Location:
TI12-security/trunk/python
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg.security.common/ndg/security/common/CredWallet.py

    r4290 r4293  
    268268    __metaclass__ = _MetaCredWallet 
    269269 
    270     _defParam = dict(userId=None, 
     270    _propDefs = dict(userId=None, 
    271271                     userX509Cert=None, 
     272                     userX509CertFilePath=None, 
    272273                     userPriKey=None, 
     274                     userPriKeyFilePath=None, 
    273275                     issuingX509Cert=None, 
     276                     issuingX509CertFilePath=None, 
    274277                     caCertFilePathList=None, 
    275278                     sslCACertFilePathList=None, 
     
    284287                     wssCfgKw={}) 
    285288     
    286     __slots__ = _defParam.keys() + [ 
     289    _protectedAttrs = [ 
    287290        '_userX509Cert', 
     291        '_userX509CertFilePath', 
    288292        '_userPriKey', 
     293        '_userPriKeyFilePath', 
     294        '_userPriKeyPwd', 
    289295        '_issuingX509Cert', 
     296        '_issuingX509CertFilePath', 
    290297        '_attributeAuthorityClnt', 
    291298        '_attributeAuthority', 
     
    293300        '_sslCACertFilePathList', 
    294301        '_credentialRepository', 
     302        '_mapFromTrustedHosts', 
     303        '_rtnExtAttCertList', 
    295304        '_attCertRefreshElapse', 
    296305        '_cfg', 
     
    299308        '_attributeAuthorityURI' 
    300309    ] 
     310     
     311    __slots__ = _propDefs.keys() + _protectedAttrs 
    301312     
    302313    def __init__(self,  
     
    324335        log.debug("Calling CredWallet.__init__ ...") 
    325336 
    326         # Initialise attributes 
    327         attr = {}.fromkeys(CredWallet.__slots__) 
    328         attr.update(CredWallet._defParam) 
     337        # Initialise attributes - 1st protected ones 
     338        attr = {}.fromkeys(CredWallet._protectedAttrs) 
     339         
     340        # ... then properties 
     341        attr.update(CredWallet._propDefs) 
    329342        for k, v in attr.items(): 
    330343            try: 
     
    416429        prop = readAndValidate(cfgFilePath, 
    417430                               cfg=self._cfg, 
    418                                validKeys=CredWallet._defParam, 
     431                               validKeys=CredWallet._propDefs, 
    419432                               prefix=prefix, 
    420433                               sections=(section,)) 
     
    492505                            fset=_setUserX509Cert, 
    493506                            doc="X.509 user certificate instance") 
    494  
    495  
     507  
     508    def _setUserX509CertFilePath(self, filePath): 
     509        "Set user X.509 cert file path property method" 
     510         
     511        if isinstance(filePath, basestring): 
     512            filePath = os.path.expandvars(filePath) 
     513            self._userX509Cert = X509CertRead(filePath) 
     514             
     515        elif filePath is not None: 
     516            raise AttributeError("User X.509 cert. file path must be a valid " 
     517                                 "string") 
     518         
     519        self._userX509CertFilePath = filePath 
     520                 
     521    userX509CertFilePath = property(fset=_setUserX509CertFilePath, 
     522                                    doc="File path to user X.509 cert.") 
     523     
    496524    def _setIssuingX509Cert(self, issuingX509Cert): 
    497525        "Set property method for X.509 user cert." 
    498526        self._issuingX509Cert = self._setX509Cert(issuingX509Cert) 
    499527         
    500  
    501528    def _getIssuingX509Cert(self): 
    502529        """Get user cert X509Cert instance""" 
     
    506533                               fset=_setIssuingX509Cert, 
    507534                               doc="X.509 user certificate instance") 
    508       
     535  
     536    def _setIssuerX509CertFilePath(self, filePath): 
     537        "Set user X.509 cert file path property method" 
     538         
     539        if isinstance(filePath, basestring): 
     540            filePath = os.path.expandvars(filePath) 
     541            self._issuerX509Cert = X509CertRead(filePath) 
     542             
     543        elif filePath is not None: 
     544            raise AttributeError("User X.509 cert. file path must be a valid " 
     545                                 "string") 
     546         
     547        self._issuerX509CertFilePath = filePath 
     548                 
     549    issuerX509CertFilePath = property(fset=_setIssuerX509CertFilePath, 
     550                                      doc="File path to user X.509 cert. " 
     551                                          "issuing cert.")      
    509552 
    510553    def _getUserPriKey(self): 
     
    525568            self._userPriKey = None 
    526569        elif isinstance(userPriKey, basestring): 
     570            pwdCallback = lambda *ar, **kw: self._userPriKeyPwd 
    527571            self._userPriKey = RSA.load_key_string(userPriKey, 
    528                                              callback=lambda *ar, **kw: None) 
     572                                                   callback=pwdCallback) 
    529573        elif isinstance(userPriKey, RSA.RSA): 
    530574            self._userPriKey = userPriKey           
     
    538582                              "messages to Attribute authority") 
    539583 
    540     
     584    def _setUserPriKeyFilePath(self, filePath): 
     585        "Set user private key file path property method" 
     586         
     587        if isinstance(filePath, basestring): 
     588            filePath = os.path.expandvars(filePath) 
     589            try: 
     590                # Read Private key to sign with     
     591                priKeyFile = BIO.File(open(filePath))  
     592                pwdCallback = lambda *ar, **kw: self._userPriKeyPwd 
     593                self._userPriKey = RSA.load_key_bio(priKeyFile,  
     594                                                    callback=pwdCallback)     
     595            except Exception, e: 
     596                raise AttributeError("Setting user private key: %s" % e) 
     597         
     598        elif filePath is not None: 
     599            raise AttributeError("Private key file path must be a valid " 
     600                                 "string or None") 
     601         
     602        self._userPriKeyFilePath = filePath 
     603         
     604    userPriKeyFilePath = property(fset=_setUserPriKeyFilePath, 
     605                                  doc="File path to user private key") 
     606  
     607    def _setUserPriKeyPwd(self, userPriKeyPwd): 
     608        "Set method for user private key file password" 
     609        if userPriKeyPwd is not None and not isinstance(userPriKeyPwd,  
     610                                                        basestring): 
     611            raise AttributeError("Signing private key password must be None " 
     612                                 "or a valid string") 
     613         
     614        self._userPriKeyPwd = userPriKeyPwd 
     615 
     616    def _getUserPriKeyPwd(self): 
     617        "Get property method for user private key" 
     618        return self._userPriKeyPwd 
     619         
     620    userPriKeyPwd = property(fset=_setUserPriKeyPwd, 
     621                             fget=_getUserPriKeyPwd, 
     622                             doc="Password protecting user private key file") 
     623         
    541624    def _getCredentials(self): 
    542625        """Get Property method.  Credentials are read-only 
     
    548631    # Publish attribute 
    549632    credentials = property(fget=_getCredentials, 
    550                            doc="List of Attribute Certificates")    
     633                           doc="List of Attribute Certificates linked to " 
     634                               "issuing authorities") 
    551635 
    552636 
     
    723807             
    724808    attributeAuthorityClnt = property(fget=_getAttributeAuthorityClnt,  
    725                       doc="Attribute Authority web service client instance") 
     809                                      doc="Attribute Authority web service " 
     810                                          "client instance") 
    726811 
    727812 
     
    731816        attributeAuthorityClnt 
    732817         
    733         @type attributeAuthorityClnt: AttAuthorityClient 
    734         @param attributeAuthorityClnt: Attribute Authority Web Service client  
    735         instance""" 
    736         return self._attributeAuthorityClnt 
    737  
     818        @rtype attributeAuthority: ndg.security.server.AttAuthority.AttAuthority 
     819        @return attributeAuthority: Attribute Authority instance""" 
     820        return self._attributeAuthority 
    738821 
    739822    def _setAttributeAuthority(self, attributeAuthority): 
     
    742825         
    743826        @type attributeAuthority: ndg.security.server.AttAuthority.AttAuthority 
    744         @param attributeAuthority: Attribute Authority Web Service.""" 
     827        @param attributeAuthority: Attribute Authority instance.""" 
    745828        if attributeAuthority is not None and \ 
    746829           not isinstance(attributeAuthority, AttAuthority): 
    747             raise TypeError("Expecting %r type for attributeAuthority " 
    748                             "attribute" % AttAuthority) 
    749              
    750         self._attributeAuthority = self.attributeAuthority 
     830            raise AttributeError("Expecting %r for attributeAuthority " 
     831                                 "attribute" % AttAuthority) 
     832             
     833        self._attributeAuthority = attributeAuthority 
    751834             
    752835    attributeAuthority = property(fget=_getAttributeAuthority, 
    753836                                  fset=_setAttributeAuthority,  
    754837                                  doc="Attribute Authority instance") 
     838 
     839 
     840    def _getMapFromTrustedHosts(self): 
     841        """Get property method for boolean flag - if set to True it allows 
     842        role mapping to be attempted when connecting to an Attribute Authority 
     843         
     844        @type mapFromTrustedHosts: bool 
     845        @param mapFromTrustedHosts: set to True to try role mapping in AC  
     846        requests to Attribute Authorities""" 
     847        return self._mapFromTrustedHosts 
     848 
     849    def _setMapFromTrustedHosts(self, mapFromTrustedHosts): 
     850        """Set property method for boolean flag - if set to True it allows 
     851        role mapping to be attempted when connecting to an Attribute Authority 
     852         
     853        @type mapFromTrustedHosts: bool 
     854        @param mapFromTrustedHosts: Attribute Authority Web Service.""" 
     855        if CredWallet.isBoolString(mapFromTrustedHosts): 
     856            mapFromTrustedHosts = bool(mapFromTrustedHosts) 
     857             
     858        elif not isinstance(mapFromTrustedHosts, bool): 
     859            raise AttributeError("Expecting %r for mapFromTrustedHosts " 
     860                                 "attribute" % bool) 
     861             
     862        self._mapFromTrustedHosts = mapFromTrustedHosts 
     863             
     864    mapFromTrustedHosts = property(fget=_getMapFromTrustedHosts, 
     865                                   fset=_setMapFromTrustedHosts,  
     866                                   doc="Set to True to enable mapped AC " 
     867                                       "requests") 
     868 
     869    def _getRtnExtAttCertList(self): 
     870        """Get property method for Attribute Authority Web Service client 
     871        instance.  Use rtnExtAttCertListURI propert to set up  
     872        rtnExtAttCertListClnt 
     873         
     874        @type rtnExtAttCertList: bool 
     875        @param rtnExtAttCertList: """ 
     876        return self._rtnExtAttCertList 
     877 
     878    def _setRtnExtAttCertList(self, rtnExtAttCertList): 
     879        """Set property method for boolean flag - when a AC request fails, 
     880        return a list of candidate ACs that could be used to re-try with in 
     881        order to get mapped AC. 
     882         
     883        @type rtnExtAttCertList: bool 
     884        @param rtnExtAttCertList: set to True to configure getAttCert to return 
     885        a list of ACs that could be used in a re-try to get a mapped AC from  
     886        the target Attribute Authority.""" 
     887        if CredWallet.isBoolString(rtnExtAttCertList): 
     888            rtnExtAttCertList = bool(rtnExtAttCertList) 
     889             
     890        elif not isinstance(rtnExtAttCertList, bool): 
     891            raise AttributeError("Expecting %r for rtnExtAttCertList " 
     892                                 "attribute" % bool) 
     893             
     894        self._rtnExtAttCertList = rtnExtAttCertList 
     895             
     896    rtnExtAttCertList = property(fget=_getRtnExtAttCertList, 
     897                                 fset=_setRtnExtAttCertList,  
     898                                 doc="Set to True to enable mapped AC " 
     899                                     "requests") 
     900 
     901    @staticmethod 
     902    def isBoolString(string): 
     903        '''Test for string set to equivalent of bool const values''' 
     904        return isinstance(string, basestring) and string in ('True', 'False') 
    755905 
    756906 
     
    10811231        The list is returned via CredWalletAttributeRequestDenied exception 
    10821232        If no value is set, the default value held in  
    1083         self._mapFromTrustedHosts is used 
     1233        self.mapFromTrustedHosts is used 
    10841234 
    10851235        @type rtnExtAttCertList: bool / None 
     
    11311281         
    11321282        log.debug("CredWallet.getAttCert ...") 
     1283         
     1284        # Both these assignments are calling set property methods implicitly! 
    11331285        if attributeAuthorityURI: 
    11341286            self.attributeAuthorityURI = attributeAuthorityURI 
    11351287        elif attributeAuthority: 
    1136             self._setAAPropFilePath  
     1288            self.attributeAuthority = attributeAuthority 
    11371289            
    11381290        if not refreshAttCert and self._credentials: 
     
    11711323        # made 
    11721324        if mapFromTrustedHosts is not None: 
    1173             self._mapFromTrustedHosts = mapFromTrustedHosts 
     1325            self.mapFromTrustedHosts = mapFromTrustedHosts 
    11741326 
    11751327        if rtnExtAttCertList is not None: 
     
    11831335                     "hosts set: %s" % extTrustedHostList) 
    11841336             
    1185             if not self._mapFromTrustedHosts: 
     1337            if not self.mapFromTrustedHosts: 
    11861338                raise CredWalletError("A list of trusted hosts has been "  
    11871339                                      "input but mapping from trusted hosts " 
     
    12411393             
    12421394            except CredWalletAttributeRequestDenied, attributeRequestDenied: 
    1243                 if not mapFromTrustedHosts and not rtnExtAttCertList: 
     1395                if not self.mapFromTrustedHosts and not self.rtnExtAttCertList: 
    12441396                    # Creating a mapped certificate is not allowed - raise 
    12451397                    # authorisation denied exception saved from earlier 
     
    13281480                            # ignore any errors and continue 
    13291481                            log.warning('AC request to trusted host "%s"' % \ 
    1330                                         info['attributeAuthorityURI'] + ' resulted in: %s'%e) 
     1482                                        info['aaURI'] + ' resulted in: %s' % e) 
    13311483                             
    13321484                     
     
    13371489 
    13381490 
    1339                 if not mapFromTrustedHosts: 
     1491                if not self.mapFromTrustedHosts: 
    13401492                     
    13411493                    # Exit here returning the list of candidate certificates 
  • TI12-security/trunk/python/ndg.security.common/ndg/security/common/wssecurity/BaseSignatureHandler.py

    r4285 r4293  
    553553    signingCertChain = property(fset=_setSigningCertChain, 
    554554                                fget=_getSigningCertChain, 
    555                doc="Cert.s in chain of trust to cert. used to verify msg.") 
     555                                doc="Cert.s in chain of trust to cert. used " 
     556                                    "to verify msg.") 
    556557 
    557558  
     
    560561        if signingPriKeyPwd is not None and \ 
    561562           not isinstance(signingPriKeyPwd, basestring): 
    562             raise AttributeError, \ 
    563                 "Signing private key password must be None or a valid string" 
     563            raise AttributeError("Signing private key password must be None " 
     564                                 "or a valid string") 
    564565         
    565566        self._signingPriKeyPwd = signingPriKeyPwd 
     
    573574    signingPriKeyPwd = property(fset=_setSigningPriKeyPwd, 
    574575                                fget=_getSigningPriKeyPwd, 
    575              doc="Password protecting private key file used to sign message") 
     576                                doc="Password protecting private key file " 
     577                                    "used to sign message") 
    576578 
    577579  
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/credwallet/credWallet.cfg

    r4285 r4293  
    2222 
    2323# See attAuthority unit tests to get this service running 
    24 #attributeAuthorityURI=http://localhost:5000/AttributeAuthority 
    25 attributeAuthorityURI=http://localhost:4900/AttributeAuthority 
     24attributeAuthorityURI=http://localhost:5000/AttributeAuthority 
     25# Switch to alt port for testing with tcpmon 
     26#attributeAuthorityURI=http://localhost:4900/AttributeAuthority 
    2627 
    2728# Omit Credential Repository and use default NullCredentialRepository 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/credwallet/credWalletTest.cfg

    r4285 r4293  
    1111[setUp] 
    1212cfgFilePath = $NDGSEC_CREDWALLET_UNITTEST_DIR/credWallet.cfg 
     13 
     14# Site B Attribute Authority from the attAuthority unit tests.  This is a site 
     15# where the user is not registered 
     16#attributeAuthorityURI=http://localhost:5100/AttributeAuthority 
     17# Test with tcpmon 
     18attributeAuthorityURI=http://localhost:5099/AttributeAuthority 
     19userX509CertFilePath=$NDGSEC_CREDWALLET_UNITTEST_DIR/test.crt 
     20userPriKeyFilePath=$NDGSEC_CREDWALLET_UNITTEST_DIR/test.key 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/credwallet/test_credwallet.py

    r4290 r4293  
    2020                                                    CaseSensitiveConfigParser 
    2121from ndg.security.common.X509 import X509CertParse 
    22 from ndg.security.common.CredWallet import * 
     22from ndg.security.common.CredWallet import CredWallet, \ 
     23                                            CredWalletAttributeRequestDenied 
    2324 
    2425from os.path import expandvars as xpdVars 
     
    180181        print "Attribute Certificate:\n%s" % attCert 
    181182          
    182 # 
    183 # 
    184 #    def test6aGetAttCertRefusedWithSessID(self): 
    185 #        """test6aGetAttCertRefusedWithSessID: make an attribute request using 
    186 #        a sessID as authentication credential requesting an AC from an 
    187 #        Attribute Authority where the user is NOT registered""" 
    188 # 
    189 #        self.sessionMgrConnect() 
    190 #         
    191 #        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri') 
    192 #         
    193 #        attCert, errMsg, extAttCertList = self.credWallet.getAttCert(sessID=self.sessID,  
    194 #                                         aaURI=aaURI, 
    195 #                                         mapFromTrustedHosts=False) 
    196 #        if errMsg: 
    197 #            print "SUCCESS - obtained expected result: %s" % errMsg 
    198 #            return 
    199 #         
    200 #        self.fail("Request allowed from AA where user is NOT registered!") 
    201 # 
    202 #    def test6bGetMappedAttCertWithSessID(self): 
    203 #        """test6bGetMappedAttCertWithSessID: make an attribute request using 
    204 #        a session ID as authentication credential""" 
    205 # 
    206 #        self.sessionMgrConnect() 
    207 #         
    208 #        # Attribute Certificate cached in test 6 can be used to get a mapped 
    209 #        # AC for this test ... 
    210 #        self.credWallet = self.test6GetAttCertWithSessID() 
    211 # 
    212 #        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri') 
    213 #         
    214 #        attCert, errMsg, extAttCertList=self.credWallet.getAttCert(sessID=self.sessID, 
    215 #                                                   aaURI=aaURI, 
    216 #                                                   mapFromTrustedHosts=True) 
    217 #        if errMsg: 
    218 #            self.fail(errMsg) 
    219 #             
    220 #        print "Attribute Certificate:\n%s" % attCert   
    221 # 
    222 #    def test6cGetAttCertWithExtAttCertListWithSessID(self): 
    223 #        """test6cGetAttCertWithSessID: make an attribute request using 
    224 #        a session ID as authentication credential""" 
    225 #         
    226 #        self.sessionMgrConnect() 
    227 #         
    228 #        aaURI = \ 
    229 #            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri') 
    230 #         
    231 #        # Use output from test6GetAttCertWithSessID! 
    232 #        extACFilePath = \ 
    233 #        xpdVars(self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID',  
    234 #                             'extacfilepath'))    
    235 #        extAttCert = open(extACFilePath).read() 
    236 #         
    237 #        attCert, errMsg, extAttCertList = self.credWallet.getAttCert( 
    238 #                                                   sessID=self.sessID,  
    239 #                                                   aaURI=aaURI, 
    240 #                                                   extAttCertList=[extAttCert]) 
    241 #        if errMsg: 
    242 #            self.fail(errMsg) 
    243 #           
    244 #        print "Attribute Certificate:\n%s" % attCert   
    245 # 
    246 # 
    247 #    def test7GetAttCertWithUserCert(self): 
    248 #        """test7GetAttCertWithUserCert: make an attribute request using 
    249 #        a user cert as authentication credential""" 
    250 #        self.sessionMgrConnect() 
    251 # 
    252 #        # Request an attribute certificate from an Attribute Authority  
    253 #        # using the userCert returned from connect() 
    254 #         
    255 #        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri') 
    256 #        attCert, errMsg, extAttCertList = self.credWallet.getAttCert(\ 
    257 #                                     userCert=self.userCert, aaURI=aaURI) 
    258 #        if errMsg: 
    259 #            self.fail(errMsg) 
    260 #           
    261 #        print "Attribute Certificate:\n%s" % attCert   
    262 # 
    263 # 
    264 #class CredWalletTestSuite(unittest.TestSuite): 
    265 #     
    266 #    def __init__(self): 
    267 #        print "CredWalletTestSuite ..." 
    268 #        smTestCaseMap = map(CredWalletTestCase, 
    269 #                          ( 
    270 #                            "test1Connect", 
    271 #                            "test6GetAttCertWithSessID", 
    272 #                            "test6bGetMappedAttCertWithSessID", 
    273 #                            "test6cGetAttCertWithExtAttCertListWithSessID", 
    274 #                            "test7GetAttCertWithUserCert", 
    275 #                          )) 
    276 #        unittest.TestSuite.__init__(self, smTestCaseMap) 
     183 
     184 
     185    def test5GetAttCertRefusedWithUserCert(self): 
     186         
     187        credWallet = CredWallet(cfg=self.cfg.get('setUp', 'cfgFilePath'))     
     188        credWallet.userX509CertFilePath = self.cfg.get('setUp', 
     189                                                       'userX509CertFilePath') 
     190        credWallet.userPriKeyFilePath = self.cfg.get('setUp', 
     191                                                     'userPriKeyFilePath') 
     192         
     193        # Set AA URI AFTER user PKI settings so that these are picked in the 
     194        # implicit call to create a new AA Client when the URI is set 
     195        credWallet.attributeAuthorityURI = self.cfg.get('setUp',  
     196                                                    'attributeAuthorityURI') 
     197        try: 
     198            attCert = credWallet.getAttCert() 
     199        except CredWalletAttributeRequestDenied, e: 
     200            print "SUCCESS - obtained expected result: %s" % e 
     201            return 
     202         
     203        self.fail("Request allowed from Attribute Authority where user is NOT " 
     204                  "registered!") 
     205 
     206    def test6GetMappedAttCertWithUserId(self): 
     207         
     208        # Call Site A Attribute Authority where user is registered 
     209        credWallet = CredWallet(cfg=self.cfg.get('setUp', 'cfgFilePath')) 
     210        attCert = credWallet.getAttCert() 
     211 
     212        # Use Attribute Certificate cached in wallet to get a mapped  
     213        # Attribute Certificate from Site B's Attribute Authority 
     214        siteBURI = self.cfg.get('setUp', 'attributeAuthorityURI')         
     215        attCert = credWallet.getAttCert(attributeAuthorityURI=siteBURI) 
     216             
     217        print("Mapped Attribute Certificate from Site B Attribute " 
     218              "Authority:\n%s" % attCert) 
    277219             
    278220                                                     
    279221if __name__ == "__main__": 
    280 #    suite = CredWalletTestSuite() 
    281 #    unittest.TextTestRunner(verbosity=2).run(suite) 
    282222    unittest.main()         
Note: See TracChangeset for help on using the changeset viewer.