Ignore:
Timestamp:
21/11/06 12:42:31 (14 years ago)
Author:
pjkersha
Message:

server-config.tac: changes to AttAuthorityService? sub class - working
stub code (apart from soap_getHostInfo) and started adding in actual hook
up to AttAuthority? code.

AttAuthorityClientTest?.py: unit tests working with server side stub version.

common/AttAuthority/init.py: change wsdl refs to url. WSDL isn't actually
relevant here, only the url for the service to be accessed.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/AttAuthority/AttAuthorityClientTest.py

    r1724 r1730  
    11#!/usr/bin/env python 
    22 
    3 """NDG Attribute Authority client - makes requests for authorisation 
     3"""NDG Attribute Authority client  
    44 
    55NERC Data Grid Project 
     
    1616import sys 
    1717 
    18 from ZSI import ServiceProxy 
     18from ndg.security.common.AttAuthority import AttAuthorityClient 
    1919 
    20 from ndg.security.AttAuthorityIO import * 
    21  
    22  
    23 class attAuthorityClientTestCase(unittest.TestCase): 
     20class AttAuthorityClientTestCase(unittest.TestCase): 
    2421     
    2522    def setUp(self): 
    2623        try: 
    2724            # Session Manager WSDL 
    28             aaWSDL = 'http://glue.badc.rl.ac.uk/attAuthority.wsdl' 
     25            self.url = 'http://127.0.0.1:5700/AttributeAuthority' 
    2926     
    3027            # Instantiate WS proxy 
    31             self.aaSrv = ServiceProxy(aaWSDL,  
    32                                       use_wsdl=True,  
    33                                       tracefile=sys.stderr) 
     28            self.clnt = AttAuthorityClient(self.url,  
     29                                           tracefile=sys.stderr) 
    3430        except Exception, e: 
    3531            self.fail(str(e)) 
     
    4137     
    4238    def testGetPubKey(self): 
     39        '''testGetPubKey: retrieve Attribute Authority's X.509 cert.''' 
     40        #import pdb;pdb.set_trace() 
     41        resp = self.clnt.getPubKey() 
     42        print "Attribute Authority public key:\n" + resp 
     43         
     44 
     45    def testGetTrustedHostInfo(self): 
     46         
    4347        try: 
    44             # Request an attribute certificate from an Attribute Authority  
    45             # using the proxyCert returned from connect() 
    46 #            import pdb 
    47 #            pdb.set_trace() 
    48             pubKeyReq = PubKeyReq() 
    49             resp = self.aaSrv.getPubKey(pubKeyReq=pubKeyReq()) 
    50             pubKeyResp = PubKeyResp(xmlTxt=resp['pubKeyResp']) 
    51      
    52             if 'errMsg' in pubKeyResp and pubKeyResp['errMsg']: 
    53                 raise Exception(pubKeyResp['errMsg']) 
    54              
    55             print "Attribute Authority public key:\n" + pubKeyResp['pubKey'] 
    56                           
     48            role = 'role' 
     49            self.clnt.getTrustedHostInfo(role) 
    5750        except Exception, e: 
    5851            self.fail(str(e)) 
    5952 
    6053 
    61     def testGetTrustedHostInfo(self): 
     54    def testGetTrustedHostInfoWithNoRole(self): 
    6255         
    6356        try: 
    64             pass 
     57            self.clnt.getTrustedHostInfo() 
    6558        except Exception, e: 
    6659            self.fail(str(e)) 
    6760 
    6861 
    69     def testReqAuthorisation(self):         
    70         """Request authorisation from NDG Attribute Authority Web Service.""" 
     62    def testGetHostInfo(self): 
     63         
     64        try: 
     65            self.clnt.getHostInfo() 
     66        except Exception, e: 
     67            self.fail(str(e)) 
     68 
     69 
     70    def testGetAttCert(self):         
     71        """testGetAttCert: Request attribute certificate from NDG Attribute  
     72        Authority Web Service.""" 
    7173     
    72         # Attribute Authority WSDL 
    73         aaWSDL = './attAuthority.wsdl' 
    74          
    7574        # User's proxy certificate 
    76         usrProxyCertFilePath = "./certs/pjkproxy.pem" 
     75        userCertFilePath = "/tmp/x509up_u1001" 
    7776     
    7877        # Existing Attribute Certificate held in user's CredentialWallet.   
    7978        # This is available for use with trusted data centres to make new  
    8079        # mapped Attribute Certificates 
    81         usrAttCertFilePath = "./attCert/attCert-pjk-BADC.xml" 
    82      
    83         # Make Attribute Authority raise an exception 
    84         #usrAttCertFilePath = "attCert-tampered.xml" 
    85      
    86      
    87         print "Requesting authorisation for user cert file: \"%s\"" % \ 
    88               usrProxyCertFilePath 
    89      
     80        userAttCertFilePath = None 
    9081     
    9182        # Read user Proxy Certificate into a string ready for passing via WS 
    9283        try: 
    93             usrProxyCertFileTxt = open(usrProxyCertFilePath, 'r').read() 
     84            userCertFileTxt = open(userCertFilePath, 'r').read() 
    9485             
    9586        except IOError, ioErr: 
     
    9990     
    10091        # Simlarly for Attribute Certificate if present ... 
    101         if usrAttCertFilePath is not None: 
     92        if userAttCertFilePath is not None: 
    10293             
    10394            try: 
    104                 usrAttCertFileTxt = open(usrAttCertFilePath, 'r').read() 
     95                userAttCertFileTxt = open(userAttCertFilePath, 'r').read() 
    10596                 
    10697            except IOError, ioErr: 
     
    10899                                        (ioErr.filename, ioErr.strerror) 
    109100        else: 
    110             usrAttCertFileTxt = None 
     101            userAttCertFileTxt = None 
    111102             
    112103     
    113         # Make authorsation request 
    114         try:    
    115             resp = self.aaSrv.reqAuthorisation(\ 
    116                                           usrProxyCert=usrProxyCertFileTxt, 
    117                                           usrAttCert=usrAttCertFileTxt) 
    118             if resp['errMsg']: 
    119                 raise Exception(resp['errMsg']) 
    120              
    121             return resp['attCert'] 
    122              
    123         except Exception, e: 
    124             self.fail(str(e)) 
     104        # Make attribute certificate request 
     105        resp = self.clnt.getAttCert(userCertFileTxt) 
     106        return resp 
    125107         
    126108  
    127109#_____________________________________________________________________________        
    128 class attAuthorityClientTestSuite(unittest.TestSuite): 
     110class AttAuthorityClientTestSuite(unittest.TestSuite): 
    129111    def __init__(self): 
    130         map = map(attAuthorityClientTestCase, 
     112        map = map(AttAuthorityClientTestCase, 
    131113                  ( 
     114                    "testGetHostInfo", 
    132115                    "testGetTrustedHostInfo", 
    133                     "testReqAuthorisation", 
     116                    "testGetTrustedHostInfoWithNoRole", 
     117                    "testGetAttCert", 
    134118                    "testGetPubKey", 
    135119                  )) 
     
    138122if __name__ == "__main__": 
    139123    unittest.main() 
    140  
    141 from AttAuthority_services import AttAuthorityServiceLocator 
    142  
    143 def main(**kw): 
    144     locator = AttAuthorityServiceLocator() 
    145     port = locator.getAttAuthority(**kw) 
    146     import pdb;pdb.set_trace() 
    147     attCert = port.getAttCert("USER CERT") 
    148     print "attCert = %s" % attCert 
    149      
    150     # Factory METHOD Just guessing here 
    151     #response = port.create(CLIENT.CreateRequest()) 
    152     #kw['endPointReference'] = response._EndpointReference 
    153     #iport = locator.getAttAuthority(**kw) 
    154 #    reactor.stop() 
    155  
    156  
    157 if __name__ == '__main__': 
    158     main(url="http://127.0.0.1:5700/AttributeAuthority") 
    159 #    op = GetBasicOptParser() 
Note: See TracChangeset for help on using the changeset viewer.