source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py @ 2746

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py@2746
Revision 2746, 12.6 KB checked in by pjkersha, 13 years ago (diff)

ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml:

  • don't comment out hostname instead include by default

ndg.security.server/ndg/security/server/SessionMgr/init.py:

  • fixed comment typo

ndg.security.server/ndg/security/server/MyProxy.py:

to prevent setting of OpenSSL config file without the required file name and
directory path.

ndg.security.test/ndg/security/test/AttCert/attCertTest.cfg,
ndg.security.test/ndg/security/test/AttCert/AttCertTest.py:

  • fixed unit tests for AC signature verification. certFilePathList can now

be set to include CA certs. to verify the X.509 cert. used in the signature

ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py:

  • fix: extAttCertList is no longer returned in getAttCert calls to SM client.

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrClientTest.cfg:

  • tests with services on glue

ndg.security.common/ndg/security/common/XMLSec.py:

  • fixed verifyEnvelopedSignature so that it is now possible to verify the

X.509 cert. in the signature against it's issuing CA cert.

ndg.security.common/ndg/security/common/SessionMgr/init.py:

  • modified getAttCert call so that extAttCertList is no longer passed back in

the returned tuple but is instead included as an attribute of the
AttributeRequestDenied? exception type.

  • updated pydoc for getAttCert method

ndg.security.common/ndg/security/common/AttAuthority/init.py:

  • typo fix - doesn't affect execution

ndg.security.common/ndg/security/common/CredWallet.py:

  • updates to getAttCert call pydoc
  • and getAttCert exception handling
  • Property svn:executable set to *
RevLine 
[848]1#!/usr/bin/env python
2
[1752]3"""Test harness for NDG Session Manager client - makes requests for
4authentication and authorisation.  An Attribute Authority and Simple CA
5services must be running for the reqAuthorisation and addUser tests
[848]6
7NERC Data Grid Project
8
[1999]9@author P J Kershaw
[848]10
[1999]1123/02/06
12
[1752]13Renamed from SessionClientTest.py 27/0/4/06
14Moved and renamed SessionMgrClientTest.py 23/11/06
[848]15
[1999]16@copyright (C) 2007 CCLRC & NERC
[848]17
[1999]18@license This software may be distributed under the terms of the Q Public
19License, version 1.0 or later.
[848]20"""
[2270]21__revision__ = "$Id:$"
[2085]22
[848]23import unittest
[2530]24import os, sys, getpass, re
[1999]25from ConfigParser import SafeConfigParser
26
[2085]27from ndg.security.common.SessionMgr import SessionMgrClient, \
28    AttributeRequestDenied
29   
[1773]30from ndg.security.common.SessionCookie import SessionCookie
[2685]31from ndg.security.common.X509 import X509CertParse, X509CertRead
[848]32
33
[1777]34class SessionMgrClientTestCase(unittest.TestCase):
[2530]35    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
[2620]36       
37    test2Passphrase = None
38    test3Passphrase = None
[2530]39
40    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
41        '''Read proxy cert and user cert from a single PEM file and put in
42        a list ready for input into SignatureHandler'''               
43        proxyCertFileTxt = open(proxyCertFilePath).read()
44       
45        pemPatRE = re.compile(self.__class__.pemPat, re.S)
46        x509CertList = pemPatRE.findall(proxyCertFileTxt)
47       
48        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
49                            x509CertList]
[848]50   
[2530]51        # Expecting proxy cert first - move this to the end.  This will
52        # be the cert used to verify the message signature
53        signingCertChain.reverse()
54       
55        return signingCertChain
56
57
[848]58    def setUp(self):
[1773]59       
[1998]60        configParser = SafeConfigParser()
61        configParser.read("./sessionMgrClientTest.cfg")
[1773]62       
[1998]63        self.cfg = {}
64        for section in configParser.sections():
65            self.cfg[section] = dict(configParser.items(section))
[1300]66
[1773]67        tracefile = sys.stderr
[1999]68
[2070]69        try:
70            if self.cfg['setUp'].get('clntprikeypwd') is None:
71                clntPriKeyPwd = getpass.getpass(\
72                            prompt="\nsetUp - client private key password: ")
73            else:
74                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
75        except KeyboardInterrupt:
76            sys.exit(0)
[2530]77
78        # List of CA certificates for use in validation of certs used in
79        # signature for server reponse
80        try:
81            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
82        except:
83            caCertFilePathList = []
84         
[2685]85        try:
86            sslCACertList = [X509CertRead(file) for file in \
87                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
88        except KeyError:
89            sslCACertList = []
[2530]90         
[2685]91         
[2530]92        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
93
94        # Check certificate types proxy or standard
95        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
96        if proxyCertFilePath:
97            signingCertChain = \
98                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
[2620]99        else:
100            signingCertChain = None
101               
[2530]102        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
103           
[1752]104        # Initialise the Session Manager client connection
105        # Omit traceFile keyword to leave out SOAP debug info
[1999]106        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
[2685]107                sslCACertList=sslCACertList,
[2679]108                sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
[2530]109                setSignatureHandler=setSignatureHandler,
110                reqBinSecTokValType=reqBinSecTokValType,
111                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
112                signingCertChain=signingCertChain,
[2076]113                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
114                signingPriKeyPwd=clntPriKeyPwd,
[2530]115                caCertFilePathList=caCertFilePathList,
[1998]116                tracefile=tracefile) 
[1752]117       
[2620]118        self.sessID = None
[1752]119        self.proxyCert = None
[2063]120        self.proxyPriKey = None
121        self.userCert = None
[848]122
[2620]123# TODO: is addUser part of session manager?
124#    def test1AddUser(self):
125#        """Add a new user ID to the MyProxy repository"""
126#       
127#        passphrase = self.cfg['test1AddUser'].get('passphrase') or \
128#            getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ")
129#           
130#        # Note the pass-phrase is read from the file tmp.  To pass
131#        # explicitly as a string use the 'passphrase' keyword instead
132#        self.clnt.addUser(self.cfg['test1AddUser']['username'],
133#                          passphrase=passphrase)
134#        print "Added user '%s'" % self.cfg['test1AddUser']['username']
[848]135       
[2620]136
137    def test2Connect(self):
138        """test2Connect: Connect as if acting as a browser client -
139        a session ID is returned"""
[848]140       
[2620]141        if self.__class__.test2Passphrase is None:
142            self.__class__.test2Passphrase = \
143                                    self.cfg['test2Connect'].get('passphrase')
144       
145        if not self.__class__.test2Passphrase:
146            self.__class__.test2Passphrase = getpass.getpass(\
147                               prompt="\ntest2Connect pass-phrase for user: ")
[848]148
[2620]149        self.proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
150            self.clnt.connect(self.cfg['test2Connect']['username'], 
151                              passphrase=self.__class__.test2Passphrase)
[1777]152
[1752]153        print "User '%s' connected to Session Manager:\n%s" % \
[2620]154            (self.cfg['test2Connect']['username'], self.sessID)
[848]155           
156
[2620]157    def test3ConnectNoCreateServerSess(self):
158        """test3ConnectNoCreateServerSess: Connect as a non browser client -
159        sessID should be None"""
[1783]160
[2620]161        if self.__class__.test3Passphrase is None:
162            self.__class__.test3Passphrase = \
163                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
164               
165        if not self.__class__.test3Passphrase:
166            self.__class__.test3Passphrase = getpass.getpass(\
167            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
[1998]168
[2620]169        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
170            self.clnt.connect(\
171                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
172                      passphrase=self.__class__.test3Passphrase,
173                      createServerSess=False)
174       
175        # Expect null session ID
176        assert(not sessID)
177         
[1752]178        print "User '%s' connected to Session Manager:\n%s" % \
[2620]179                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
180                     self.proxyCert)
[1770]181           
[848]182
[2620]183    def test4DisconnectUsingSessID(self):
184        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
[2418]185        """
[1770]186       
[2620]187        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
188        self.test2Connect()
[1783]189       
[2620]190        self.clnt.disconnect(sessID=self.sessID)
[2076]191       
[2620]192        print "User disconnected from Session Manager:\n%s" % self.sessID
[1770]193           
[848]194
[2620]195    def test5DisconnectUsingProxyCert(self):
196        """test5DisconnectUsingProxyCert: Disconnect as a command line client
[2418]197        """
[1783]198       
[2620]199        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
200        self.test2Connect()
[2063]201       
[2620]202        # Use proxy cert / private key just obtained from connect call for
203        # signature generation         
204        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
205        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
206        self.clnt.signatureHandler.signingCertChain = (self.userCert,
207                                                       self.proxyCert)
[2063]208       
[2620]209        # Proxy cert in signature determines ID of session to
[2437]210        # delete
211        self.clnt.disconnect()
[1783]212        print "User disconnected from Session Manager:\n%s" % self.proxyCert
[1770]213
214
[2620]215    def test6GetAttCertUsingSessID(self):
216        """test6GetAttCertUsingSessID: make an attribute request using
217        a session ID as authentication credential"""
[1999]218
[2620]219        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
220        self.test2Connect()
[2079]221       
[2746]222        attCert = self.clnt.getAttCert(\
[2620]223            sessID=self.sessID, 
224            attAuthorityURI=self.cfg['test6GetAttCertUsingSessID']['aauri'])
[1785]225       
226        print "Attribute Certificate:\n%s" % attCert 
[2085]227
228
[2620]229    def test6aGetAttCertRefusedUsingSessID(self):
230        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
231        a sessID as authentication credential requesting an AC from an
[2085]232        Attribute Authority where the user is NOT registered"""
233
[2620]234        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
235        self.test2Connect()
[2085]236       
[2620]237        aaURI = self.cfg['test6aGetAttCertRefusedUsingSessID']['aauri']
[2085]238       
239        try:
[2746]240            attCert = self.clnt.getAttCert(sessID=self.sessID, 
241                                           attAuthorityURI=aaURI,
242                                           mapFromTrustedHosts=False)
[2085]243        except AttributeRequestDenied, e:
244            print "SUCCESS - obtained expected result: %s" % e
245            return
246       
247        self.fail("Request allowed from AA where user is NOT registered!")
248
249
[2620]250    def test6bGetMappedAttCertUsingSessID(self):
251        """test6bGetMappedAttCertUsingSessID: make an attribute request using
252        a session ID as authentication credential"""
[2085]253
[2620]254        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
255        self.test2Connect()
[2085]256       
[2620]257        aaURI = self.cfg['test6bGetMappedAttCertUsingSessID']['aauri']
[2085]258       
[2746]259        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
[2620]260       
[2085]261        print "Attribute Certificate:\n%s" % attCert 
262
263
[2620]264    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
265        """test6GetAttCertUsingSessID: make an attribute request using
266        a session ID as authentication credential"""
[1777]267       
[2620]268        print "\n\t" + \
269            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
270        self.test2Connect()
[1998]271       
272        aaURI = \
[2620]273            self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['aauri']
[1998]274           
[2746]275        attCert = self.clnt.getAttCert(sessID=self.sessID, 
276                                       attAuthorityURI=aaURI,
277                                       extAttCertList=['AC1', 'AC2', 'AC3'])
[1777]278         
[1785]279        print "Attribute Certificate:\n%s" % attCert 
[1777]280
281
[2620]282    def test7GetAttCertUsingProxyCert(self):
283        """test7GetAttCertUsingProxyCert: make an attribute request using
[1752]284        a proxy cert as authentication credential"""
[2620]285        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
286        self.test2Connect()
287
288        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
289        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
290        self.clnt.signatureHandler.signingCertChain = (self.userCert,
291                                                       self.proxyCert)
[1752]292       
293        # Request an attribute certificate from an Attribute Authority
294        # using the proxyCert returned from connect()
[1998]295       
[2620]296        aaURI = self.cfg['test7GetAttCertUsingProxyCert']['aauri']
[2746]297        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
[1785]298         
299        print "Attribute Certificate:\n%s" % attCert 
[848]300
301
[1999]302    def test8GetX509Cert(self):
303        "test8GetX509Cert: return the Session Manager's X.509 Cert."
[1773]304        cert = self.clnt.getX509Cert()
[1752]305                                             
[1773]306        print "Session Manager X.509 Certificate:\n" + cert
[1176]307           
[848]308           
309#_____________________________________________________________________________       
[1752]310class SessionMgrClientTestSuite(unittest.TestSuite):
[848]311   
312    def __init__(self):
[1752]313        map = map(SessionMgrClientTestCase,
[848]314                  (
[1999]315                    "test1AddUser",
[2620]316                    "test2Connect",
317                    "test3ConnectNoCreateServerSess",
318                    "test4DisconnectUsingSessID",
319                    "test5DisconnectUsingProxyCert",
320                    "test6GetAttCertUsingSessID",
321                    "test6bGetMappedAttCertUsingSessID",
322                    "test6cGetAttCertWithExtAttCertListUsingSessID",
323                    "test7GetAttCertUsingProxyCert",
[1999]324                    "test8GetX509Cert",
[848]325                  ))
326        unittest.TestSuite.__init__(self, map)
327           
328                                                   
329if __name__ == "__main__":
330    unittest.main()       
Note: See TracBrowser for help on using the repository browser.