source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py @ 2685

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py@2685
Revision 2685, 13.1 KB checked in by pjkersha, 13 years ago (diff)

Preparing new DEWS 0.8.0 release -

ndg.security.server/setup.py: remove commented out code

setup.py, ndg.security.client/setup.py, ndg.security.test/setup.py,
ndg.security.server/setup.py, ndg.security.common/setup.py:
update version to 0.8.0

ndg.security.test/ndg/security/test/AttAuthority/siteAAttAuthorityProperties.xml:
reset default transport to http

ndg.security.test/ndg/security/test/AttAuthority/attAuthorityClientTest.cfg:
default test settings for DEWS

ndg.security.test/ndg/security/test/SessionMgr/SessionMgrClientTest.py:

  • updated for tests with SSL - sslCACertList keyword

ndg.security.test/ndg/security/test/SessionMgr/sessionMgrClientTest.cfg:

  • test with SSL

ndg.security.common/ndg/security/common/SessionMgr/init.py:

  • include new SSL settings sslCACertList and sslCACertFilePathList

keywords / properties

  • removed transdict keyword
  • changed tranport attribute to _transport and transdict to _transdict

ndg.security.common/ndg/security/common/AttAuthority/init.py:

  • import httplib to enable catch for httplib.BadStatusLine? exception - this

is thrown when trying to connect with http to https service

  • include sslCACertFilePathList property
  • remove clntCertFilePath, clntPriKeyFilePath and clntPriKeyPwd properties -

no longer needed

ndg.security.common/ndg/security/common/m2CryptoSSLUtility.py:

  • new property caCertFilePathList enables setting of CA certs from file list
  • fix to HTTPSConnection class - set _postConnectionCheck attribute to

SSL.Checker.Checker default if not equivalent keyword was set

ndg.security.common/ndg/security/common/CredWallet.py:

  • enable calls to Attribute Authorities to set CA list for peer cert

verification with SSL connections

ndg-security-install.py: added new -t option to enable install of unit tests
package

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2
3"""Test harness for NDG Session Manager client - makes requests for
4authentication and authorisation.  An Attribute Authority and Simple CA
5services must be running for the reqAuthorisation and addUser tests
6
7NERC Data Grid Project
8
9@author P J Kershaw
10
1123/02/06
12
13Renamed from SessionClientTest.py 27/0/4/06
14Moved and renamed SessionMgrClientTest.py 23/11/06
15
16@copyright (C) 2007 CCLRC & NERC
17
18@license This software may be distributed under the terms of the Q Public
19License, version 1.0 or later.
20"""
21__revision__ = "$Id:$"
22
23import unittest
24import os, sys, getpass, re
25from ConfigParser import SafeConfigParser
26
27from ndg.security.common.SessionMgr import SessionMgrClient, \
28    AttributeRequestDenied
29   
30from ndg.security.common.SessionCookie import SessionCookie
31from ndg.security.common.X509 import X509CertParse, X509CertRead
32
33
34class SessionMgrClientTestCase(unittest.TestCase):
35    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
36       
37    test2Passphrase = None
38    test3Passphrase = None
39
40    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
41        '''Read proxy cert and user cert from a single PEM file and put in
42        a list ready for input into SignatureHandler'''               
43        proxyCertFileTxt = open(proxyCertFilePath).read()
44       
45        pemPatRE = re.compile(self.__class__.pemPat, re.S)
46        x509CertList = pemPatRE.findall(proxyCertFileTxt)
47       
48        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
49                            x509CertList]
50   
51        # Expecting proxy cert first - move this to the end.  This will
52        # be the cert used to verify the message signature
53        signingCertChain.reverse()
54       
55        return signingCertChain
56
57
58    def setUp(self):
59       
60        configParser = SafeConfigParser()
61        configParser.read("./sessionMgrClientTest.cfg")
62       
63        self.cfg = {}
64        for section in configParser.sections():
65            self.cfg[section] = dict(configParser.items(section))
66
67        tracefile = sys.stderr
68
69        try:
70            if self.cfg['setUp'].get('clntprikeypwd') is None:
71                clntPriKeyPwd = getpass.getpass(\
72                            prompt="\nsetUp - client private key password: ")
73            else:
74                clntPriKeyPwd = self.cfg['setUp'].get('clntprikeypwd')
75        except KeyboardInterrupt:
76            sys.exit(0)
77
78        # List of CA certificates for use in validation of certs used in
79        # signature for server reponse
80        try:
81            caCertFilePathList=self.cfg['setUp']['cacertfilepathlist'].split()
82        except:
83            caCertFilePathList = []
84         
85        try:
86            sslCACertList = [X509CertRead(file) for file in \
87                         self.cfg['setUp']['sslcacertfilepathlist'].split()]
88        except KeyError:
89            sslCACertList = []
90         
91         
92        reqBinSecTokValType = self.cfg['setUp'].get('reqbinsectokvaltype')
93
94        # Check certificate types proxy or standard
95        proxyCertFilePath = self.cfg['setUp'].get('proxycertfilepath')
96        if proxyCertFilePath:
97            signingCertChain = \
98                        self._getCertChainFromProxyCertFile(proxyCertFilePath)
99        else:
100            signingCertChain = None
101               
102        setSignatureHandler = eval(self.cfg['setUp']['setsignaturehandler'])
103           
104        # Initialise the Session Manager client connection
105        # Omit traceFile keyword to leave out SOAP debug info
106        self.clnt = SessionMgrClient(uri=self.cfg['setUp']['smuri'],
107                sslCACertList=sslCACertList,
108                sslPeerCertCN=self.cfg['setUp'].get('sslpeercertcn'),
109                setSignatureHandler=setSignatureHandler,
110                reqBinSecTokValType=reqBinSecTokValType,
111                signingCertFilePath=self.cfg['setUp'].get('clntcertfilepath'),
112                signingCertChain=signingCertChain,
113                signingPriKeyFilePath=self.cfg['setUp']['clntprikeyfilepath'],
114                signingPriKeyPwd=clntPriKeyPwd,
115                caCertFilePathList=caCertFilePathList,
116                tracefile=tracefile) 
117       
118        self.sessID = None
119        self.proxyCert = None
120        self.proxyPriKey = None
121        self.userCert = None
122
123# TODO: is addUser part of session manager?
124#    def test1AddUser(self):
125#        """Add a new user ID to the MyProxy repository"""
126#       
127#        passphrase = self.cfg['test1AddUser'].get('passphrase') or \
128#            getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ")
129#           
130#        # Note the pass-phrase is read from the file tmp.  To pass
131#        # explicitly as a string use the 'passphrase' keyword instead
132#        self.clnt.addUser(self.cfg['test1AddUser']['username'],
133#                          passphrase=passphrase)
134#        print "Added user '%s'" % self.cfg['test1AddUser']['username']
135       
136
137    def test2Connect(self):
138        """test2Connect: Connect as if acting as a browser client -
139        a session ID is returned"""
140       
141        if self.__class__.test2Passphrase is None:
142            self.__class__.test2Passphrase = \
143                                    self.cfg['test2Connect'].get('passphrase')
144       
145        if not self.__class__.test2Passphrase:
146            self.__class__.test2Passphrase = getpass.getpass(\
147                               prompt="\ntest2Connect pass-phrase for user: ")
148
149        self.proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
150            self.clnt.connect(self.cfg['test2Connect']['username'], 
151                              passphrase=self.__class__.test2Passphrase)
152
153        print "User '%s' connected to Session Manager:\n%s" % \
154            (self.cfg['test2Connect']['username'], self.sessID)
155           
156
157    def test3ConnectNoCreateServerSess(self):
158        """test3ConnectNoCreateServerSess: Connect as a non browser client -
159        sessID should be None"""
160
161        if self.__class__.test3Passphrase is None:
162            self.__class__.test3Passphrase = \
163                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
164               
165        if not self.__class__.test3Passphrase:
166            self.__class__.test3Passphrase = getpass.getpass(\
167            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
168
169        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
170            self.clnt.connect(\
171                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
172                      passphrase=self.__class__.test3Passphrase,
173                      createServerSess=False)
174       
175        # Expect null session ID
176        assert(not sessID)
177         
178        print "User '%s' connected to Session Manager:\n%s" % \
179                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
180                     self.proxyCert)
181           
182
183    def test4DisconnectUsingSessID(self):
184        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
185        """
186       
187        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
188        self.test2Connect()
189       
190        self.clnt.disconnect(sessID=self.sessID)
191       
192        print "User disconnected from Session Manager:\n%s" % self.sessID
193           
194
195    def test5DisconnectUsingProxyCert(self):
196        """test5DisconnectUsingProxyCert: Disconnect as a command line client
197        """
198       
199        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
200        self.test2Connect()
201       
202        # Use proxy cert / private key just obtained from connect call for
203        # signature generation         
204        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
205        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
206        self.clnt.signatureHandler.signingCertChain = (self.userCert,
207                                                       self.proxyCert)
208       
209        # Proxy cert in signature determines ID of session to
210        # delete
211        self.clnt.disconnect()
212        print "User disconnected from Session Manager:\n%s" % self.proxyCert
213
214
215    def test6GetAttCertUsingSessID(self):
216        """test6GetAttCertUsingSessID: make an attribute request using
217        a session ID as authentication credential"""
218
219        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
220        self.test2Connect()
221       
222        attCert, extAttCertList = self.clnt.getAttCert(\
223            sessID=self.sessID, 
224            attAuthorityURI=self.cfg['test6GetAttCertUsingSessID']['aauri'])
225       
226        print "Attribute Certificate:\n%s" % attCert 
227        print "External Attribute Certificate List:\n%s" % extAttCertList
228
229
230    def test6aGetAttCertRefusedUsingSessID(self):
231        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
232        a sessID as authentication credential requesting an AC from an
233        Attribute Authority where the user is NOT registered"""
234
235        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
236        self.test2Connect()
237       
238        aaURI = self.cfg['test6aGetAttCertRefusedUsingSessID']['aauri']
239       
240        try:
241            attCert, extAttCertList = self.clnt.getAttCert(\
242                                                    sessID=self.sessID, 
243                                                    attAuthorityURI=aaURI,
244                                                    mapFromTrustedHosts=False)
245        except AttributeRequestDenied, e:
246            print "SUCCESS - obtained expected result: %s" % e
247            return
248       
249        self.fail("Request allowed from AA where user is NOT registered!")
250
251
252    def test6bGetMappedAttCertUsingSessID(self):
253        """test6bGetMappedAttCertUsingSessID: make an attribute request using
254        a session ID as authentication credential"""
255
256        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
257        self.test2Connect()
258       
259        aaURI = self.cfg['test6bGetMappedAttCertUsingSessID']['aauri']
260       
261        attCert, extAttCertList = self.clnt.getAttCert(sessID=self.sessID, 
262                                                       attAuthorityURI=aaURI)
263       
264        print "Attribute Certificate:\n%s" % attCert 
265        print "External Attribute Certificate List:\n%s" % extAttCertList
266
267
268    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
269        """test6GetAttCertUsingSessID: make an attribute request using
270        a session ID as authentication credential"""
271       
272        print "\n\t" + \
273            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
274        self.test2Connect()
275       
276        aaURI = \
277            self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['aauri']
278           
279        attCert, extAttCertList = self.clnt.getAttCert(\
280                                        sessID=self.sessID, 
281                                        attAuthorityURI=aaURI,
282                                        extAttCertList=['AC1', 'AC2', 'AC3'])
283         
284        print "Attribute Certificate:\n%s" % attCert 
285        print "External Attribute Certificate List:\n%s" % extAttCertList
286
287
288    def test7GetAttCertUsingProxyCert(self):
289        """test7GetAttCertUsingProxyCert: make an attribute request using
290        a proxy cert as authentication credential"""
291        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
292        self.test2Connect()
293
294        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
295        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
296        self.clnt.signatureHandler.signingCertChain = (self.userCert,
297                                                       self.proxyCert)
298       
299        # Request an attribute certificate from an Attribute Authority
300        # using the proxyCert returned from connect()
301       
302        aaURI = self.cfg['test7GetAttCertUsingProxyCert']['aauri']
303        attCert, extAttCertList = self.clnt.getAttCert(attAuthorityURI=aaURI)
304         
305        print "Attribute Certificate:\n%s" % attCert 
306        print "External Attribute Certificate List:\n%s" % extAttCertList
307
308
309    def test8GetX509Cert(self):
310        "test8GetX509Cert: return the Session Manager's X.509 Cert."
311        cert = self.clnt.getX509Cert()
312                                             
313        print "Session Manager X.509 Certificate:\n" + cert
314           
315           
316#_____________________________________________________________________________       
317class SessionMgrClientTestSuite(unittest.TestSuite):
318   
319    def __init__(self):
320        map = map(SessionMgrClientTestCase,
321                  (
322                    "test1AddUser",
323                    "test2Connect",
324                    "test3ConnectNoCreateServerSess",
325                    "test4DisconnectUsingSessID",
326                    "test5DisconnectUsingProxyCert",
327                    "test6GetAttCertUsingSessID",
328                    "test6bGetMappedAttCertUsingSessID",
329                    "test6cGetAttCertWithExtAttCertListUsingSessID",
330                    "test7GetAttCertUsingProxyCert",
331                    "test8GetX509Cert",
332                  ))
333        unittest.TestSuite.__init__(self, map)
334           
335                                                   
336if __name__ == "__main__":
337    unittest.main()       
Note: See TracBrowser for help on using the repository browser.