source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py @ 3145

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py@3145
Revision 3145, 13.5 KB checked in by pjkersha, 12 years ago (diff)

python/www/html/sessionMgr.wsdl,
python/ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services.py,
python/ndg.security.common/ndg/security/common/SessionMgr/SessionMgr_services_types.py,
python/ndg.security.server/ndg/security/server/SessionMgr/SessionMgr_services_server.py:

  • remove refs to proxy certs - using MyProxy? as CA proxy certs aren't generated.
  • make issuingCert nillable as it won't be set if calling MyProxy? in Simple CA mode

python/ndg.security.server/ndg/security/server/conf/sessionMgr.tac: removes refs to proxy cert - replace with user cert

python/ndg.security.server/ndg/security/server/conf/sessionMgrProperties.xml: fix MyProxy? cert times - these are in seconds NOT hours

python/ndg.security.server/ndg/security/server/MyProxy.py: remove '\0's from get and info commands

python/ndg.security.test/ndg/security/test/sessionMgrClient/SessionMgrClientTest.py: fixed for tests with new MyProxy? config as SimpleCA

python/ndg.security.test/ndg/security/test/sessionMgrClient/server.sh: get rid of --pidfile arg to twistd - not needed.

python/ndg.security.test/ndg/security/test/sessionMgrClient/sm-clnt.crt,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sm-clnt.key,

python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrProperties.xml,
python/ndg.security.test/ndg/security/test/sessionMgr/sessionMgrTest.cfg,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrProperties.xml,
python/ndg.security.test/ndg/security/test/sessionMgrClient/sessionMgrClientTest.cfg: altered for tests with multiple CAs

python/ndg.security.common/ndg/security/common/SessionMgr/init.py:

  • removed addUser method - not needed
  • switched refs to proxy cert -> user cert
  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[848]1#!/usr/bin/env python
[1752]2"""Test harness for NDG Session Manager client - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
[848]5
6NERC Data Grid Project
7"""
[2909]8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
[2085]16
[848]17import unittest
[2530]18import os, sys, getpass, re
[1999]19from ConfigParser import SafeConfigParser
20
[2085]21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
[1773]24from ndg.security.common.SessionCookie import SessionCookie
[2685]25from ndg.security.common.X509 import X509CertParse, X509CertRead
[848]26
27
[1777]28class SessionMgrClientTestCase(unittest.TestCase):
[2530]29    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
[2620]30       
31    test2Passphrase = None
32    test3Passphrase = None
[2530]33
[3145]34    def _getCertChainFromProxyCertFile(self, certChainFilePath):
35        '''Read user cert and user cert from a single PEM file and put in
[2530]36        a list ready for input into SignatureHandler'''               
[3145]37        certChainFileTxt = open(certChainFilePath).read()
[2530]38       
39        pemPatRE = re.compile(self.__class__.pemPat, re.S)
[3145]40        x509CertList = pemPatRE.findall(certChainFileTxt)
[2530]41       
42        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
43                            x509CertList]
[848]44   
[3145]45        # Expecting user cert first - move this to the end.  This will
[2530]46        # be the cert used to verify the message signature
47        signingCertChain.reverse()
48       
49        return signingCertChain
50
51
[848]52    def setUp(self):
[1773]53       
[1998]54        configParser = SafeConfigParser()
55        configParser.read("./sessionMgrClientTest.cfg")
[1773]56       
[1998]57        self.cfg = {}
58        for section in configParser.sections():
59            self.cfg[section] = dict(configParser.items(section))
[1300]60
[1773]61        tracefile = sys.stderr
[1999]62
[2070]63        try:
64            if self.cfg['setUp'].get('clntprikeypwd') is None:
65                clntPriKeyPwd = getpass.getpass(\
66                            prompt="\nsetUp - client private key password: ")
67            else:
68                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
69        except KeyboardInterrupt:
70            sys.exit(0)
[2530]71
72        # List of CA certificates for use in validation of certs used in
73        # signature for server reponse
74        try:
75            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
76        except:
77            caCertFilePathList = []
78         
[2685]79        try:
80            sslCACertList = [X509CertRead(file) for file in \
81                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
82        except KeyError:
83            sslCACertList = []
[2530]84         
[2685]85         
[2530]86        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
87
[3145]88        # Check certificate types user or standard
89        userCertFilePath = self.cfg['setUp'].get('usercertfilepath')
90        if userCertFilePath:
[2530]91            signingCertChain = \
[3145]92                        self._getCertChainFromProxyCertFile(userCertFilePath)
[2620]93        else:
94            signingCertChain = None
95               
[2530]96        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
97           
[1752]98        # Initialise the Session Manager client connection
99        # Omit traceFile keyword to leave out SOAP debug info
[1999]100        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
[2685]101                sslCACertList=sslCACertList,
[2679]102                sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
[2530]103                setSignatureHandler=setSignatureHandler,
104                reqBinSecTokValType=reqBinSecTokValType,
105                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
106                signingCertChain=signingCertChain,
[2076]107                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
108                signingPriKeyPwd=clntPriKeyPwd,
[2530]109                caCertFilePathList=caCertFilePathList,
[1998]110                tracefile=tracefile) 
[1752]111       
[2620]112        self.sessID = None
[2063]113        self.userCert = None
[3145]114        self.userPriKey = None
115        self.issuingCert = None
[848]116       
[2620]117
[3044]118    def test1Connect(self):
119        """test1Connect: Connect as if acting as a browser client -
[2620]120        a session ID is returned"""
[848]121       
[2620]122        if self.__class__.test2Passphrase is None:
123            self.__class__.test2Passphrase = \
[3044]124                                    self.cfg['test1Connect'].get('passphrase')
[2620]125       
126        if not self.__class__.test2Passphrase:
127            self.__class__.test2Passphrase = getpass.getpass(\
[3145]128                               prompt="\ntest1Connect pass-phrase for user: ")
[848]129
[3145]130        self.userCert, self.userPriKey, self.issuingCert, self.sessID = \
[3044]131            self.clnt.connect(self.cfg['test1Connect']['username'], 
[2620]132                              passphrase=self.__class__.test2Passphrase)
[1777]133
[1752]134        print "User '%s' connected to Session Manager:\n%s" % \
[3044]135            (self.cfg['test1Connect']['username'], self.sessID)
[848]136           
[3044]137           
138    def test2GetSessionStatus(self):
139        """test2GetSessionStatus: check a session is alive"""
140        print "\n\t" + self.test2GetSessionStatus.__doc__
141       
142        self.test1Connect()
143        assert self.clnt.getSessionStatus(sessID=self.sessID), \
144                "Session is dead"
145               
146        print "User connected to Session Manager with sessID=%s" % self.sessID
[848]147
[3044]148        assert not self.clnt.getSessionStatus(sessID='abc'), \
149            "sessID=abc shouldn't exist!"
150           
151        print "CORRECT: sessID=abc doesn't exist"
152
153
[2620]154    def test3ConnectNoCreateServerSess(self):
155        """test3ConnectNoCreateServerSess: Connect as a non browser client -
156        sessID should be None"""
[1783]157
[2620]158        if self.__class__.test3Passphrase is None:
159            self.__class__.test3Passphrase = \
160                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
161               
162        if not self.__class__.test3Passphrase:
163            self.__class__.test3Passphrase = getpass.getpass(\
164            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
[1998]165
[3145]166        self.userCert, self.userPriKey, self.issuingCert, sessID = \
[2620]167            self.clnt.connect(\
168                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
169                      passphrase=self.__class__.test3Passphrase,
170                      createServerSess=False)
171       
172        # Expect null session ID
173        assert(not sessID)
174         
[1752]175        print "User '%s' connected to Session Manager:\n%s" % \
[2620]176                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
[3145]177                     self.userCert)
[1770]178           
[848]179
[3145]180    def test4DisconnectWithSessID(self):
181        """test4DisconnectWithSessID: disconnect as if acting as a browser client
[2418]182        """
[1770]183       
[3145]184        print "\n\t" + self.test4DisconnectWithSessID.__doc__
[3044]185        self.test1Connect()
[1783]186       
[2620]187        self.clnt.disconnect(sessID=self.sessID)
[2076]188       
[2620]189        print "User disconnected from Session Manager:\n%s" % self.sessID
[1770]190           
[848]191
[3145]192    def test5DisconnectWithUserCert(self):
193        """test5DisconnectWithUserCert: Disconnect as a command line client
[2418]194        """
[1783]195       
[3145]196        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
[3044]197        self.test1Connect()
[2063]198       
[3145]199        # Use user cert / private key just obtained from connect call for
200        # signature generation
201        if self.issuingCert:
202            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
203            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
204            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
205                                                           self.userCert)
206            self.clnt.signatureHandler.signingCert = None
207        else:
208            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
209            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
210            self.clnt.signatureHandler.signingCertChain = ()
211            self.clnt.signatureHandler.signingCert = self.userCert
212           
[2620]213        # Proxy cert in signature determines ID of session to
[2437]214        # delete
215        self.clnt.disconnect()
[3145]216        print "User disconnected from Session Manager:\n%s" % self.userCert
[1770]217
218
[3145]219    def test6GetAttCertWithSessID(self):
220        """test6GetAttCertWithSessID: make an attribute request using
[2620]221        a session ID as authentication credential"""
[1999]222
[3145]223        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
[3044]224        self.test1Connect()
[2079]225       
[2746]226        attCert = self.clnt.getAttCert(\
[2620]227            sessID=self.sessID, 
[3145]228            attAuthorityURI=self.cfg['test6GetAttCertWithSessID']['aauri'])
[1785]229       
[2866]230        print "Attribute Certificate:\n%s" % attCert
231        attCert.filePath = \
[3145]232            self.cfg['test6GetAttCertWithSessID']['acoutfilepath'] 
[2866]233        attCert.write()
[2085]234
235
[3145]236    def test6aGetAttCertRefusedWithSessID(self):
237        """test6aGetAttCertRefusedWithSessID: make an attribute request using
[2620]238        a sessID as authentication credential requesting an AC from an
[2085]239        Attribute Authority where the user is NOT registered"""
240
[3145]241        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
[3044]242        self.test1Connect()
[2085]243       
[3145]244        aaURI = self.cfg['test6aGetAttCertRefusedWithSessID']['aauri']
[2085]245       
246        try:
[2746]247            attCert = self.clnt.getAttCert(sessID=self.sessID, 
248                                           attAuthorityURI=aaURI,
249                                           mapFromTrustedHosts=False)
[2085]250        except AttributeRequestDenied, e:
251            print "SUCCESS - obtained expected result: %s" % e
252            return
253       
254        self.fail("Request allowed from AA where user is NOT registered!")
255
256
[3145]257    def test6bGetMappedAttCertWithSessID(self):
258        """test6bGetMappedAttCertWithSessID: make an attribute request using
[2620]259        a session ID as authentication credential"""
[2085]260
[3145]261        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
[3044]262        self.test1Connect()
[2085]263       
[3145]264        aaURI = self.cfg['test6bGetMappedAttCertWithSessID']['aauri']
[2085]265       
[2746]266        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
[2620]267       
[2085]268        print "Attribute Certificate:\n%s" % attCert 
269
270
[3145]271    def test6cGetAttCertWithExtAttCertListWithSessID(self):
272        """test6cGetAttCertWithSessID: make an attribute request using
[2620]273        a session ID as authentication credential"""
[1777]274       
[2620]275        print "\n\t" + \
[3145]276            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
[3044]277        self.test1Connect()
[1998]278       
279        aaURI = \
[3145]280            self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['aauri']
[2866]281       
[3145]282        # Use output from test6GetAttCertWithSessID!
[2866]283        extACFilePath = \
[3145]284    self.cfg['test6cGetAttCertWithExtAttCertListWithSessID']['extacfilepath']   
[2866]285        extAttCert = open(extACFilePath).read()
286       
[2746]287        attCert = self.clnt.getAttCert(sessID=self.sessID, 
288                                       attAuthorityURI=aaURI,
[2866]289                                       extAttCertList=[extAttCert])
[1777]290         
[1785]291        print "Attribute Certificate:\n%s" % attCert 
[1777]292
293
[3145]294    def test7GetAttCertWithUserCert(self):
295        """test7GetAttCertWithUserCert: make an attribute request using
296        a user cert as authentication credential"""
297        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
[3044]298        self.test1Connect()
[2620]299
[3145]300        if self.issuingCert:
301            self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
302            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
303            self.clnt.signatureHandler.signingCertChain = (self.issuingCert,
304                                                           self.userCert)
305            self.clnt.signatureHandler.signingCert = None
306        else:
307            self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'
308            self.clnt.signatureHandler.signingPriKey = self.userPriKey       
309            self.clnt.signatureHandler.signingCertChain = ()
310            self.clnt.signatureHandler.signingCert = self.userCert
[1752]311       
312        # Request an attribute certificate from an Attribute Authority
[3145]313        # using the userCert returned from connect()
[1998]314       
[3145]315        aaURI = self.cfg['test7GetAttCertWithUserCert']['aauri']
[2746]316        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
[1785]317         
318        print "Attribute Certificate:\n%s" % attCert 
[848]319
320
[1999]321    def test8GetX509Cert(self):
322        "test8GetX509Cert: return the Session Manager's X.509 Cert."
[1773]323        cert = self.clnt.getX509Cert()
[1752]324                                             
[1773]325        print "Session Manager X.509 Certificate:\n" + cert
[1176]326           
[848]327           
328#_____________________________________________________________________________       
[1752]329class SessionMgrClientTestSuite(unittest.TestSuite):
[848]330   
331    def __init__(self):
[1752]332        map = map(SessionMgrClientTestCase,
[848]333                  (
[3044]334                    "test1Connect",
335                    "test2GetSessionStatus",
[2620]336                    "test3ConnectNoCreateServerSess",
[3145]337                    "test4DisconnectWithSessID",
338                    "test5DisconnectWithUserCert",
339                    "test6GetAttCertWithSessID",
340                    "test6bGetMappedAttCertWithSessID",
341                    "test6cGetAttCertWithExtAttCertListWithSessID",
342                    "test7GetAttCertWithUserCert",
[1999]343                    "test8GetX509Cert",
[848]344                  ))
345        unittest.TestSuite.__init__(self, map)
346           
347                                                   
348if __name__ == "__main__":
349    unittest.main()       
Note: See TracBrowser for help on using the repository browser.