source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 3032

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@3032
Revision 3032, 10.8 KB checked in by pjkersha, 12 years ago (diff)

Moved Session Manager ndg.security.server.SessionMgr? unit test file.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "23/02/06"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id: SessionMgrClientTest.py 2909 2007-09-28 14:22:21Z pjkersha $'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20
21from ndg.security.common.SessionMgr import SessionMgrClient, \
22    AttributeRequestDenied
23   
24from ndg.security.common.SessionCookie import SessionCookie
25from ndg.security.common.X509 import X509CertParse, X509CertRead
26
27
28class SessionMgrClientTestCase(unittest.TestCase):
29    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
30       
31    test2Passphrase = None
32    test3Passphrase = None
33
34    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
35        '''Read proxy cert and user cert from a single PEM file and put in
36        a list ready for input into SignatureHandler'''               
37        proxyCertFileTxt = open(proxyCertFilePath).read()
38       
39        pemPatRE = re.compile(self.__class__.pemPat, re.S)
40        x509CertList = pemPatRE.findall(proxyCertFileTxt)
41       
42        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
43                            x509CertList]
44   
45        # Expecting proxy cert first - move this to the end.  This will
46        # be the cert used to verify the message signature
47        signingCertChain.reverse()
48       
49        return signingCertChain
50
51
52    def setUp(self):
53       
54        self.cfg = SafeConfigParser()
55        self.cfg.read("./sessionMgrTest.cfg")
56       
57           
58        # Initialise the Session Manager client connection
59        # Omit traceFile keyword to leave out SOAP debug info
60        self.sm = SessionMgr(propFilePath=self.cfg.get('setUp', 
61                                                       'propFilepPath') 
62
63# TODO: is addUser part of session manager?
64#    def test1AddUser(self):
65#        """Add a new user ID to the MyProxy repository"""
66#       
67#        passphrase = self.cfg['test1AddUser'].get('passphrase') or \
68#            getpass.getpass(prompt="\ntest1AddUser pass-phrase for new user: ")
69#           
70#        # Note the pass-phrase is read from the file tmp.  To pass
71#        # explicitly as a string use the 'passphrase' keyword instead
72#        self.clnt.addUser(self.cfg['test1AddUser']['username'],
73#                          passphrase=passphrase)
74#        print "Added user '%s'" % self.cfg['test1AddUser']['username']
75       
76
77    def test2Connect(self):
78        """test2Connect: Connect as if acting as a browser client -
79        a session ID is returned"""
80       
81        if self.__class__.test2Passphrase is None:
82            self.__class__.test2Passphrase = \
83                                    self.cfg['test2Connect'].get('passphrase')
84       
85        if not self.__class__.test2Passphrase:
86            self.__class__.test2Passphrase = getpass.getpass(\
87                               prompt="\ntest2Connect pass-phrase for user: ")
88
89        self.proxyCert, self.proxyPriKey, self.userCert, self.sessID = \
90            self.clnt.connect(self.cfg['test2Connect']['username'], 
91                              passphrase=self.__class__.test2Passphrase)
92
93        print "User '%s' connected to Session Manager:\n%s" % \
94            (self.cfg['test2Connect']['username'], self.sessID)
95           
96
97    def test3ConnectNoCreateServerSess(self):
98        """test3ConnectNoCreateServerSess: Connect as a non browser client -
99        sessID should be None"""
100
101        if self.__class__.test3Passphrase is None:
102            self.__class__.test3Passphrase = \
103                self.cfg['test3ConnectNoCreateServerSess'].get('passphrase')
104               
105        if not self.__class__.test3Passphrase:
106            self.__class__.test3Passphrase = getpass.getpass(\
107            prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user: ")
108
109        self.proxyCert, self.proxyPriKey, self.userCert, sessID = \
110            self.clnt.connect(\
111                      self.cfg['test3ConnectNoCreateServerSess']['username'], 
112                      passphrase=self.__class__.test3Passphrase,
113                      createServerSess=False)
114       
115        # Expect null session ID
116        assert(not sessID)
117         
118        print "User '%s' connected to Session Manager:\n%s" % \
119                    (self.cfg['test3ConnectNoCreateServerSess']['username'], 
120                     self.proxyCert)
121           
122
123    def test4DisconnectUsingSessID(self):
124        """test4DisconnectUsingSessID: disconnect as if acting as a browser client
125        """
126       
127        print "\n\t" + self.test4DisconnectUsingSessID.__doc__
128        self.test2Connect()
129       
130        self.clnt.disconnect(sessID=self.sessID)
131       
132        print "User disconnected from Session Manager:\n%s" % self.sessID
133           
134
135    def test5DisconnectUsingProxyCert(self):
136        """test5DisconnectUsingProxyCert: Disconnect as a command line client
137        """
138       
139        print "\n\t" + self.test5DisconnectUsingProxyCert.__doc__
140        self.test2Connect()
141       
142        # Use proxy cert / private key just obtained from connect call for
143        # signature generation         
144        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
145        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
146        self.clnt.signatureHandler.signingCertChain = (self.userCert,
147                                                       self.proxyCert)
148       
149        # Proxy cert in signature determines ID of session to
150        # delete
151        self.clnt.disconnect()
152        print "User disconnected from Session Manager:\n%s" % self.proxyCert
153
154
155    def test6GetAttCertUsingSessID(self):
156        """test6GetAttCertUsingSessID: make an attribute request using
157        a session ID as authentication credential"""
158
159        print "\n\t" + self.test6GetAttCertUsingSessID.__doc__       
160        self.test2Connect()
161       
162        attCert = self.clnt.getAttCert(\
163            sessID=self.sessID, 
164            attAuthorityURI=self.cfg['test6GetAttCertUsingSessID']['aauri'])
165       
166        print "Attribute Certificate:\n%s" % attCert
167        attCert.filePath = \
168            self.cfg['test6GetAttCertUsingSessID']['acoutfilepath'] 
169        attCert.write()
170
171
172    def test6aGetAttCertRefusedUsingSessID(self):
173        """test6aGetAttCertRefusedUsingSessID: make an attribute request using
174        a sessID as authentication credential requesting an AC from an
175        Attribute Authority where the user is NOT registered"""
176
177        print "\n\t" + self.test6aGetAttCertRefusedUsingSessID.__doc__       
178        self.test2Connect()
179       
180        aaURI = self.cfg['test6aGetAttCertRefusedUsingSessID']['aauri']
181       
182        try:
183            attCert = self.clnt.getAttCert(sessID=self.sessID, 
184                                           attAuthorityURI=aaURI,
185                                           mapFromTrustedHosts=False)
186        except AttributeRequestDenied, e:
187            print "SUCCESS - obtained expected result: %s" % e
188            return
189       
190        self.fail("Request allowed from AA where user is NOT registered!")
191
192
193    def test6bGetMappedAttCertUsingSessID(self):
194        """test6bGetMappedAttCertUsingSessID: make an attribute request using
195        a session ID as authentication credential"""
196
197        print "\n\t" + self.test6bGetMappedAttCertUsingSessID.__doc__       
198        self.test2Connect()
199       
200        aaURI = self.cfg['test6bGetMappedAttCertUsingSessID']['aauri']
201       
202        attCert=self.clnt.getAttCert(sessID=self.sessID,attAuthorityURI=aaURI)
203       
204        print "Attribute Certificate:\n%s" % attCert 
205
206
207    def test6cGetAttCertWithExtAttCertListUsingSessID(self):
208        """test6cGetAttCertUsingSessID: make an attribute request using
209        a session ID as authentication credential"""
210       
211        print "\n\t" + \
212            self.test6cGetAttCertWithExtAttCertListUsingSessID.__doc__       
213        self.test2Connect()
214       
215        aaURI = \
216            self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['aauri']
217       
218        # Use output from test6GetAttCertUsingSessID!
219        extACFilePath = \
220    self.cfg['test6cGetAttCertWithExtAttCertListUsingSessID']['extacfilepath']   
221        extAttCert = open(extACFilePath).read()
222       
223        attCert = self.clnt.getAttCert(sessID=self.sessID, 
224                                       attAuthorityURI=aaURI,
225                                       extAttCertList=[extAttCert])
226         
227        print "Attribute Certificate:\n%s" % attCert 
228
229
230    def test7GetAttCertUsingProxyCert(self):
231        """test7GetAttCertUsingProxyCert: make an attribute request using
232        a proxy cert as authentication credential"""
233        print "\n\t" + self.test7GetAttCertUsingProxyCert.__doc__
234        self.test2Connect()
235
236        self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'
237        self.clnt.signatureHandler.signingPriKey = self.proxyPriKey       
238        self.clnt.signatureHandler.signingCertChain = (self.userCert,
239                                                       self.proxyCert)
240       
241        # Request an attribute certificate from an Attribute Authority
242        # using the proxyCert returned from connect()
243       
244        aaURI = self.cfg['test7GetAttCertUsingProxyCert']['aauri']
245        attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)
246         
247        print "Attribute Certificate:\n%s" % attCert 
248
249
250    def test8GetX509Cert(self):
251        "test8GetX509Cert: return the Session Manager's X.509 Cert."
252        cert = self.clnt.getX509Cert()
253                                             
254        print "Session Manager X.509 Certificate:\n" + cert
255           
256           
257#_____________________________________________________________________________       
258class SessionMgrClientTestSuite(unittest.TestSuite):
259   
260    def __init__(self):
261        map = map(SessionMgrClientTestCase,
262                  (
263                    "test1AddUser",
264                    "test2Connect",
265                    "test3ConnectNoCreateServerSess",
266                    "test4DisconnectUsingSessID",
267                    "test5DisconnectUsingProxyCert",
268                    "test6GetAttCertUsingSessID",
269                    "test6bGetMappedAttCertUsingSessID",
270                    "test6cGetAttCertWithExtAttCertListUsingSessID",
271                    "test7GetAttCertUsingProxyCert",
272                    "test8GetX509Cert",
273                  ))
274        unittest.TestSuite.__init__(self, map)
275           
276                                                   
277if __name__ == "__main__":
278    unittest.main()       
Note: See TracBrowser for help on using the repository browser.