Changeset 3122


Ignore:
Timestamp:
06/12/07 14:26:58 (12 years ago)
Author:
pjkersha
Message:

security/python/ndg.security.test/ndg/security/test/wsSecurity/client: complete unit test

Location:
TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity
Files:
2 added
2 deleted
7 edited
1 moved

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/client

    • Property svn:ignore set to
      Junk-*.pem
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/client/Echo_services.py

    r3115 r3122  
    11##################################################  
    2 # EchoServer_services.py  
     2# Echo_services.py  
    33# generated by ZSI.generate.wsdl2python 
    44################################################## 
    55 
    66 
    7 from EchoServer_services_types import * 
     7from Echo_services_types import * 
    88import urlparse, types 
    99from ZSI.TCcompound import ComplexType, Struct 
     
    1313 
    1414# Locator 
    15 class EchoServerLocator: 
    16     EchoServer_address = "http://localhost:7100" 
    17     def getEchoServerAddress(self): 
    18         return EchoServerLocator.EchoServer_address 
    19     def getEchoServer(self, url=None, **kw): 
    20         return EchoServerSOAP(url or EchoServerLocator.EchoServer_address, **kw) 
     15class EchoLocator: 
     16    Echo_address = "http://localhost:7100" 
     17    def getEchoAddress(self): 
     18        return EchoLocator.Echo_address 
     19    def getEcho(self, url=None, **kw): 
     20        return EchoSOAP(url or EchoLocator.Echo_address, **kw) 
    2121 
    2222# Methods 
    23 class EchoServerSOAP: 
     23class EchoSOAP: 
    2424    def __init__(self, url, **kw): 
    2525        kw.setdefault("readerclass", None) 
     
    2929        # no ws-addressing 
    3030 
    31     # op: <ZSI.wstools.WSDLTools.Message instance at 0x4070322c> 
     31    # op: <ZSI.wstools.WSDLTools.Message instance at 0x406f8b8c> 
    3232    def Echo(self, EchoIn): 
    3333 
    34         request = EchoRequest() 
     34        request = EchoInputMsg() 
    3535        request._EchoIn = EchoIn 
    3636 
     
    3939        self.binding.Send(None, None, request, soapaction="Echo", **kw) 
    4040        # no output wsaction 
    41         response = self.binding.Receive(EchoResponse.typecode) 
     41        response = self.binding.Receive(EchoOutputMsg.typecode) 
    4242        EchoResult = response._EchoResult 
    4343        return EchoResult 
    4444 
    45     # op: <ZSI.wstools.WSDLTools.Message instance at 0x40703a0c> 
    46     def EchoEncr(self, EchoIn): 
     45EchoInputMsg = ns0.Echo_Dec().pyclass 
    4746 
    48         request = EchoEncrRequest() 
    49         request._EchoIn = EchoIn 
    50  
    51         kw = {} 
    52         # no input wsaction 
    53         self.binding.Send(None, None, request, soapaction="EchoEncr", **kw) 
    54         # no output wsaction 
    55         response = self.binding.Receive(EchoEncrResponse.typecode) 
    56         EchoResult = response._EchoResult 
    57         return EchoResult 
    58  
    59 EchoRequest = ns0.Echo_Dec().pyclass 
    60  
    61 EchoResponse = ns0.EchoResponse_Dec().pyclass 
    62  
    63 EchoEncrRequest = ns0.EchoEncr_Dec().pyclass 
    64  
    65 EchoEncrResponse = ns0.EchoEncrResponse_Dec().pyclass 
     47EchoOutputMsg = ns0.EchoResponse_Dec().pyclass 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/client/Echo_services_types.py

    r3115 r3122  
    11##################################################  
    2 # EchoServer_services_types.py  
     2# Echo_services_types.py  
    33# generated by ZSI.generate.wsdl2python 
    44################################################## 
     
    5858            self.pyclass = Holder 
    5959 
    60     class EchoEncr_Dec(ZSI.TCcompound.ComplexType, ElementDeclaration): 
    61         literal = "EchoEncr" 
    62         schema = "urn:ZSI:examples" 
    63         def __init__(self, **kw): 
    64             ns = ns0.EchoEncr_Dec.schema 
    65             TClist = [ZSI.TC.String(pname=(ns,"EchoIn"), aname="_EchoIn", minOccurs=0, maxOccurs=1, nillable=False, typed=False, encoded=kw.get("encoded"))] 
    66             kw["pname"] = ("urn:ZSI:examples","EchoEncr") 
    67             kw["aname"] = "_EchoEncr" 
    68             self.attribute_typecode_dict = {} 
    69             ZSI.TCcompound.ComplexType.__init__(self,None,TClist,inorder=0,**kw) 
    70             class Holder: 
    71                 __metaclass__ = pyclass_type 
    72                 typecode = self 
    73                 def __init__(self): 
    74                     # pyclass 
    75                     self._EchoIn = None 
    76                     return 
    77             Holder.__name__ = "EchoEncr_Holder" 
    78             self.pyclass = Holder 
    79  
    80     class EchoEncrResponse_Dec(ZSI.TCcompound.ComplexType, ElementDeclaration): 
    81         literal = "EchoEncrResponse" 
    82         schema = "urn:ZSI:examples" 
    83         def __init__(self, **kw): 
    84             ns = ns0.EchoEncrResponse_Dec.schema 
    85             TClist = [ZSI.TC.String(pname=(ns,"EchoResult"), aname="_EchoResult", minOccurs=0, maxOccurs=1, nillable=False, typed=False, encoded=kw.get("encoded"))] 
    86             kw["pname"] = ("urn:ZSI:examples","EchoEncrResponse") 
    87             kw["aname"] = "_EchoEncrResponse" 
    88             self.attribute_typecode_dict = {} 
    89             ZSI.TCcompound.ComplexType.__init__(self,None,TClist,inorder=0,**kw) 
    90             class Holder: 
    91                 __metaclass__ = pyclass_type 
    92                 typecode = self 
    93                 def __init__(self): 
    94                     # pyclass 
    95                     self._EchoResult = None 
    96                     return 
    97             Holder.__name__ = "EchoEncrResponse_Holder" 
    98             self.pyclass = Holder 
    99  
    10060# end class ns0 (tns: urn:ZSI:examples) 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/client/echoClientTest.py

    r3115 r3122  
    44# 
    55# Import the client proxy object 
    6 from EchoServer_client import EchoServerSOAP 
    7 import sys 
    8  
    9 import wsSecurity 
     6from Echo_services import EchoLocator 
    107 
    118import unittest 
     
    1815from ndg.security.common import wsSecurity 
    1916 
    20 class WSSecurityClientTestCase(unittest.TestCase): 
     17class EchoClientTestCase(unittest.TestCase): 
    2118     
    2219    def setUp(self): 
    2320         
    24         configParser = SafeConfigParser() 
    25         configParser.read("./myProxyClientTest.cfg") 
     21        self.cfg = SafeConfigParser() 
     22        self.cfg.read("./echoClientTest.cfg") 
     23        uri = self.cfg.get('setUp', 'uri') 
     24        signingPriKeyFilePath = self.cfg.get('setUp', 'signingPriKeyFilePath') 
     25        signingPriKeyPwd = self.cfg.get('setUp', 'signingPriKeyPwd') 
     26        signingCertFilePath = self.cfg.get('setUp', 'signingCertFilePath') 
     27        caCertFilePathList = self.cfg.get('setUp',  
     28                                          'caCertFilePathList').split() 
    2629         
    27         self.cfg = {} 
    28         for section in configParser.sections(): 
    29             self.cfg[section] = dict(configParser.items(section)) 
    30              
    31         self.clnt = WSSecurityClient(\ 
    32                           propFilePath=self.cfg['setUp']['propfilepath']) 
     30        # Signature handler object is passed to binding 
     31        sigHandler = wsSecurity.SignatureHandler( 
     32                                 signingPriKeyFilePath=signingPriKeyFilePath, 
     33                                 signingPriKeyPwd=signingPriKeyPwd, 
     34                                 signingCertFilePath=signingCertFilePath, 
     35                                 caCertFilePathList=caCertFilePathList) 
     36 
     37        locator = EchoLocator() 
     38        self.clnt = locator.getEcho(uri,  
     39                                    sig_handler=sigHandler, 
     40                                    tracefile=sys.stderr) 
    3341         
    3442 
     
    3644        '''test1Echo: test signed message and signed response from server''' 
    3745             
    38         passphrase = self.cfg['test1Echo'].get('passphrase') 
    39         if passphrase is None: 
    40             passphrase = getpass.getpass(\ 
    41                                  prompt="\ntest1Echo cred. pass-phrase: ") 
    42              
    43         ownerPassphrase = self.cfg['test1Echo'].get('ownerpassphrase') 
    44         if ownerPassphrase is None: 
    45             ownerPassphrase = getpass.getpass(\ 
    46                               prompt="\ntest1Echo cred. owner pass-phrase: ") 
    47              
    4846        try: 
    49             self.clnt.store(self.cfg['test1Echo']['username'], 
    50                         passphrase, 
    51                         self.cfg['test1Echo']['certfile'], 
    52                         self.cfg['test1Echo']['keyfile'], 
    53                         ownerCertFile=self.cfg['test1Echo']['ownercertfile'], 
    54                         ownerKeyFile=self.cfg['test1Echo']['ownerkeyfile'], 
    55                         ownerPassphrase=ownerPassphrase, 
    56                         force=False) 
    57             print "Store creds for user %s" % \ 
    58                                             self.cfg['test1Echo']['username'] 
     47            resp = self.clnt.Echo("Hello from client") 
     48            print "Message returned was: %s" % resp 
    5949        except: 
    6050            self.fail(traceback.print_exc()) 
    61              
     51      
    6252#_____________________________________________________________________________        
    63 class WSSecurityClientTestSuite(unittest.TestSuite): 
     53class EchoClientTestSuite(unittest.TestSuite): 
    6454    def __init__(self): 
    65         map = map(WSSecurityClientTestCase, 
     55        map = map(EchoClientTestCase, 
    6656                  ( 
    6757                    "test1Echo", 
     
    7161if __name__ == "__main__": 
    7262    unittest.main() 
    73 # Lambda used by WS-Security handler to check which operation has been 
    74 # invoked 
    75 isEchoRequest = lambda sw: sw.body.node.childNodes[0].localName=='Echo' 
    76 isEchoEncrRequest = lambda sw: sw.body.node.childNodes[0].localName=='EchoEncr' 
    77  
    78 isEchoResponse = lambda ps: \ 
    79 ps.dom.childNodes[1].childNodes[1].childNodes[0].localName == 'EchoResponse' 
    80 isEchoEncrResponse = lambda ps: \ 
    81 ps.dom.childNodes[1].childNodes[1].childNodes[0].localName=='EchoEncrResponse' 
    82  
    83 class WSSEhandler: 
    84     def __init__(self, sigHandler=None, encrHandler=None): 
    85         self.sigHandler = sigHandler 
    86         self.encrHandler = encrHandler 
    87          
    88     def sign(self, sw): 
    89         if self.sigHandler: 
    90             self.sigHandler.sign(sw) 
    91              
    92     def verify(self, ps): 
    93         if self.sigHandler: 
    94             self.sigHandler.verify(ps) 
    95          
    96     def encrypt(self, sw): 
    97         if self.encrHandler: 
    98             self.encrHandler.encrypt(sw) 
    99             
    100     def decrypt(self, ps): 
    101         if self.encrHandler: 
    102             self.encrHandler.decrypt(ps) 
    103  
    104  
    105 #priKeyPwd = open('../tmp2').read().strip() 
    106 #certFilePath = '../Junk-cert.pem' 
    107 #priKeyFilePath = '../Junk-key.pem' 
    108 priKeyPwd = None 
    109 certFilePath = '../webSphereTestcert.pem' 
    110 priKeyFilePath = '../webSphereTestkey.pem' 
    111  
    112 # Signature handler object is passed to binding 
    113 sigHandler = wsSecurity.SignatureHandler(certFilePath=certFilePath, 
    114                                          priKeyFilePath=priKeyFilePath, 
    115                                          priKeyPwd=priKeyPwd) 
    116  
    117 encrHandler = wsSecurity.EncryptionHandler(certFilePath=certFilePath, 
    118                                            priKeyFilePath=priKeyFilePath, 
    119                                            priKeyPwd=priKeyPwd) 
    120  
    121 # Test encryption only 
    122 #wsseHandler = WSSEhandler(sigHandler, encrHandler) 
    123 wsseHandler = WSSEhandler(encrHandler=encrHandler) 
    124      
    125          
    126 # Instantiate a client proxy object, then call it 
    127 #wsURL = "http://192.100.78.234:9081/EchoServiceWeb/services/EchoServer" 
    128 wsURL = "http://localhost:7000" 
    129 echoSrv = EchoServerSOAP(wsURL, 
    130                          sig_handler=sigHandler, 
    131                          encr_handler=encrHandler, 
    132                          tracefile=sys.stdout) 
    133 try: 
    134     #import pdb;pdb.set_trace() 
    135     print echoSrv.Echo("Test String") 
    136     #print echoSrv.EchoEncr("Test Secret") 
    137 except Exception, e: 
    138     print "Failed to echo: ", e 
    139  
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/server

    • Property svn:ignore set to
      Junk-*.pem
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/server/Echo_services.py

    r3113 r3122  
    3232    def Echo(self, EchoIn): 
    3333 
    34         request = EchoRequest() 
     34        request = EchoInputMsg() 
    3535        request._EchoIn = EchoIn 
    3636 
     
    3939        self.binding.Send(None, None, request, soapaction="Echo", **kw) 
    4040        # no output wsaction 
    41         response = self.binding.Receive(EchoResponse.typecode) 
     41        response = self.binding.Receive(EchoOutputMsg.typecode) 
    4242        EchoResult = response._EchoResult 
    4343        return EchoResult 
    4444 
    45 EchoRequest = ns0.Echo_Dec().pyclass 
     45EchoInputMsg = ns0.Echo_Dec().pyclass 
    4646 
    47 EchoResponse = ns0.EchoResponse_Dec().pyclass 
     47EchoOutputMsg = ns0.EchoResponse_Dec().pyclass 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/server/Echo_services_server.py

    r3113 r3122  
    3232  </types> 
    3333 
    34   <message name=\"EchoRequest\"> 
     34  <message name=\"EchoInputMsg\"> 
    3535    <part element=\"tns:Echo\" name=\"parameters\"/> 
    3636  </message> 
    37   <message name=\"EchoResponse\"> 
     37  <message name=\"EchoOutputMsg\"> 
    3838    <part element=\"tns:EchoResponse\" name=\"parameters\"/> 
    3939  </message> 
     
    4141  <portType name=\"Echo\"> 
    4242    <operation name=\"Echo\"> 
    43       <input message=\"tns:EchoRequest\"/> 
    44       <output message=\"tns:EchoResponse\"/> 
     43      <input message=\"tns:EchoInputMsg\"/> 
     44      <output message=\"tns:EchoOutputMsg\"/> 
    4545    </operation> 
    4646  </portType> 
     
    8181 
    8282    def soap_Echo(self, ps): 
    83         self.request = ps.Parse(EchoRequest.typecode) 
     83        self.request = ps.Parse(EchoInputMsg.typecode) 
    8484        parameters = self.request._EchoIn 
    8585 
     
    8888            parameters = self.impl.Echo(parameters) 
    8989 
    90         result = EchoResponse() 
     90        result = EchoOutputMsg() 
    9191        # If we have an implementation object, copy the result  
    9292        if hasattr(self,'impl'): 
     
    9595 
    9696    soapAction['Echo'] = 'soap_Echo' 
    97     root[(EchoRequest.typecode.nspname,EchoRequest.typecode.pname)] = 'soap_Echo' 
     97    root[(EchoInputMsg.typecode.nspname,EchoInputMsg.typecode.pname)] = 'soap_Echo' 
    9898 
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/wsSecurity/server/echoServer.py

    r3113 r3122  
    44# 
    55import sys, os 
     6from ConfigParser import SafeConfigParser 
    67 
    78# Import the ZSI stuff you'd need no matter what 
    8 from ZSI.ServiceContainer import ServiceContainer, SOAPRequestHandler 
     9from ZSI.ServiceContainer import ServiceContainer 
    910 
    1011# This is a new method imported to show it's value 
     
    3940        _Echo.__init__(self, **kw) 
    4041         
    41     def sign(self, sw): 
    42         '''Overrides ServiceInterface class method to allow digital signature 
    43         ''' 
    44         if isinstance(self.request, EchoRequest): 
    45             # Echo response applies digital signature 
    46             self.signatureHandler.sign(sw) 
    47          
    48     def verify(self, ps): 
    49         '''Overrides ServiceInterface class method to allow signature  
    50         verification'''       
    51         if isinstance(self.request, EchoRequest): 
    52             # Echo request checks digital signature 
    53             self.signatureHandler.verify(ps) 
    54  
    5542    def soap_Echo(self, ps, **kw): 
    5643        '''Simple echo method to test WS-Security DSIG 
     
    5946        @param ps: client SOAP message 
    6047        @rtype: tuple 
    61         @return: request and response objects''' 
     48        @return: response objects''' 
    6249        if self.__debug: 
    6350            import pdb 
    6451            pdb.set_trace() 
    65              
    66         response = AttAuthorityService.soap_Echo(self, ps) 
     52         
     53        response = _Echo.soap_Echo(self, ps)     
     54        response.EchoResult = "Received message from client: " + \ 
     55                            self.request.EchoIn 
     56        return response 
    6757 
    68         response.EchoResult = self.request._EchoIn 
    69         return response 
    70     
    71 # Create a Server implementation 
    72  
    73 #_____________________________________________________________________________ 
    74 class EchoSOAPRequestHandler(SOAPRequestHandler): 
    75      """Add a do_GET method to return the WSDL on HTTP GET requests. 
    76      Please note that the path to the wsdl file is derived from what 
    77      the HTTP invocation delivers (which is put into the self.path 
    78      attribute), so you might want to change this addressing scheme. 
    79      """ 
    80      def do_GET(self): 
    81          """Return the WSDL file."""          
    82          self.send_xml(EchoServer_interface.EchoServer._wsdl) 
    83           
    84      def do_POST(self): 
    85           """Fudge to get _Dispatch to pick up the correct address 
    86           - seems to be necessary when putting proxy redirect for port in 
    87           the wsdl e.g. http://glue.badc.rl.ac.uk/sessionMgr points to the 
    88           default port for the Session Manager.""" 
    89           self.path = "/EchoServIn" 
    90           SOAPRequestHandler.do_POST(self) 
    9158    
    9259if __name__ == "__main__": 
    9360    # Here we set up the server 
    94     hostname = 'localhost' 
    95     port = 7000 
    96     path = "/Echo" 
    97     serviceContainer = ServiceContainer((hostname, port)) 
     61    cfg = SafeConfigParser() 
     62    cfg.read("./echoServer.cfg") 
    9863     
     64    hostname = cfg.get('setUp', 'hostname') 
     65    port = cfg.getint('setUp', 'port') 
     66    path = cfg.get('setUp', 'path') 
     67    signingPriKeyFilePath = cfg.get('setUp', 'signingPriKeyFilePath') 
     68    signingPriKeyPwd = cfg.get('setUp', 'signingPriKeyPwd') 
     69    signingCertFilePath = cfg.get('setUp', 'signingCertFilePath') 
     70    caCertFilePathList = cfg.get('setUp', 'caCertFilePathList').split() 
     71 
     72    serviceContainer = ServiceContainer((hostname, port))    
    9973     
    10074    # Create the Inherited version of the server 
Note: See TracChangeset for help on using the changeset viewer.