source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py @ 4377

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py@4377
Revision 4377, 12.2 KB checked in by pjkersha, 11 years ago (diff)

Renamed Attribute Authority classes and reran unittests

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2008 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "P.J.Kershaw@rl.ac.uk"
13__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
14
15import unittest
16import os, sys, getpass, re
17import logging
18logging.basicConfig()
19
20from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
21    NoMatchingRoleInTrustedHosts
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse, X509CertRead
24from ndg.security.common.wssecurity.dom import SignatureHandler as SigHdlr
25from ndg.security.common.utils.ConfigFileParsers import \
26    CaseSensitiveConfigParser
27   
28from os.path import expandvars as xpdVars
29from os.path import join as jnPath
30mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
31
32
33class AttributeAuthorityClientTestCase(unittest.TestCase):
34    clntPriKeyPwd = None
35    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
36
37    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
38        '''Read proxy cert and user cert from a single PEM file and put in
39        a list ready for input into SignatureHandler'''               
40        proxyCertFileTxt = open(proxyCertFilePath).read()
41       
42        pemPatRE = re.compile(self.__class__.pemPat, re.S)
43        x509CertList = pemPatRE.findall(proxyCertFileTxt)
44       
45        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
46                            x509CertList]
47   
48        # Expecting proxy cert first - move this to the end.  This will
49        # be the cert used to verify the message signature
50        signingCertChain.reverse()
51       
52        return signingCertChain
53
54
55    def setUp(self):
56
57        if 'NDGSEC_INT_DEBUG' in os.environ:
58            import pdb
59            pdb.set_trace()
60       
61        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
62            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
63                os.path.abspath(os.path.dirname(__file__))
64
65        self.cfgParser = CaseSensitiveConfigParser()
66        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
67                                'attAuthorityClientTest.cfg')
68        self.cfgParser.read(cfgFilePath)
69       
70        self.cfg = {}
71        for section in self.cfgParser.sections():
72            self.cfg[section] = dict(self.cfgParser.items(section))
73
74        try:
75            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
76                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
77        except KeyError:
78            sslCACertList = []
79           
80        # Instantiate WS proxy
81        self.siteAClnt = AttributeAuthorityClient(uri=self.cfg['setUp']['uri'],
82                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
83                        sslCACertList=sslCACertList,
84                        cfgFileSection='wsse',
85                        cfg=self.cfgParser)           
86   
87    def test1GetX509Cert(self):
88        '''test1GetX509Cert: retrieve Attribute Authority's X.509 cert.'''
89        resp = self.siteAClnt.getX509Cert()
90        print "Attribute Authority X.509 cert.:\n" + resp
91
92    def test2GetHostInfo(self):
93        """test2GetHostInfo: retrieve info for AA host"""
94        hostInfo = self.siteAClnt.getHostInfo()
95        print "Host Info:\n %s" % hostInfo       
96
97    def test3GetTrustedHostInfo(self):
98        """test3GetTrustedHostInfo: retrieve trusted host info matching a
99        given role"""
100        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
101                                 self.cfg['test3GetTrustedHostInfo']['role'])
102        for hostname, hostInfo in trustedHostInfo.items():
103            assert hostname, "Hostname not set"
104            for k, v in hostInfo.items():
105                assert k, "hostInfo value key unset"
106
107        print "Trusted Host Info:\n %s" % trustedHostInfo
108
109    def test3aGetTrustedHostInfoWithNoMatchingRoleFound(self):
110        """test3aGetTrustedHostInfoWithNoMatchingRoleFound: test the case
111        where the input role doesn't match any roles in the target AA's map
112        config file"""
113        _cfg = self.cfg['test3aGetTrustedHostInfoWithNoMatchingRoleFound']
114        try:
115            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
116            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
117           
118        except NoMatchingRoleInTrustedHosts, e:
119            print 'As expected - no match for role "%s": %s' % \
120                (_cfg['role'], e)
121
122
123    def test4GetTrustedHostInfoWithNoRole(self):
124        """test4GetTrustedHostInfoWithNoRole: retrieve trusted host info
125        irrespective of role"""
126        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
127        for hostname, hostInfo in trustedHostInfo.items():
128            assert hostname, "Hostname not set"
129            for k, v in hostInfo.items():
130                assert k, "hostInfo value key unset"
131                assert v, ("%s value not set" % k)
132                   
133        print "Trusted Host Info:\n %s" % trustedHostInfo
134       
135
136    def test4aGetAllHostsInfo(self):
137        """test4aGetAllHostsInfo: retrieve info for all hosts"""
138        allHostInfo = self.siteAClnt.getAllHostsInfo()
139        for hostname, hostInfo in allHostInfo.items():
140            assert hostname, "Hostname not set"
141            for k, v in hostInfo.items():
142                assert k, "hostInfo value key unset"
143                   
144        print "All Hosts Info:\n %s" % allHostInfo
145
146
147    def test5GetAttCert(self):       
148        """test5GetAttCert: Request attribute certificate from NDG Attribute
149        Authority Web Service."""
150        _cfg = self.cfg['test5GetAttCert']
151       
152        # Read user Certificate into a string ready for passing via WS
153        try:
154            userCertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
155            userCertTxt = open(userCertFilePath, 'r').read()
156       
157        except TypeError:
158            # No issuing cert set
159            userCertTxt = None
160               
161        except IOError, ioErr:
162            raise Exception("Error reading certificate file \"%s\": %s" % \
163                                    (ioErr.filename, ioErr.strerror))
164
165        # Make attribute certificate request
166        attCert = self.siteAClnt.getAttCert(userCert=userCertTxt)
167       
168        print "Attribute Certificate: \n\n:" + str(attCert)
169       
170        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
171        attCert.write()
172       
173       
174    def test6GetAttCertWithUserIdSet(self):       
175        """test6GetAttCertWithUserIdSet: Request attribute certificate from
176        NDG Attribute Authority Web Service setting a specific user Id
177        independent of the signer of the SOAP request."""
178        _cfg = self.cfg['test6GetAttCertWithUserIdSet']
179       
180        # Read user Certificate into a string ready for passing via WS
181        try:
182            userCertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
183            userCertTxt = open(userCertFilePath, 'r').read()
184       
185        except TypeError:
186            # No issuing cert set
187            userCertTxt = None
188               
189        except IOError, ioErr:
190            raise Exception("Error reading certificate file \"%s\": %s" % \
191                                    (ioErr.filename, ioErr.strerror))
192
193        # Make attribute certificate request
194        userId = _cfg['userId']
195        attCert = self.siteAClnt.getAttCert(userId=userId,
196                                            userCert=userCertTxt)
197       
198        print "Attribute Certificate: \n\n:" + str(attCert)
199       
200        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
201        attCert.write()
202
203
204    def test7GetMappedAttCert(self):       
205        """test7GetMappedAttCert: Request mapped attribute certificate from
206        NDG Attribute Authority Web Service."""
207        _cfg = self.cfg['test7GetMappedAttCert']
208       
209        # Read user Certificate into a string ready for passing via WS
210        try:
211            userCertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
212            userCertTxt = open(userCertFilePath, 'r').read()
213       
214        except TypeError:
215            # No issuing cert set
216            userCertTxt = None
217               
218        except IOError, ioErr:
219            raise Exception("Error reading certificate file \"%s\": %s" % \
220                                    (ioErr.filename, ioErr.strerror))
221   
222        # Simlarly for Attribute Certificate
223        try:
224            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
225           
226        except IOError, ioErr:
227            raise Exception("Error reading attribute certificate file \"%s\": "
228                            "%s" % (ioErr.filename, ioErr.strerror))
229       
230        # Make client to site B Attribute Authority
231        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
232                                       cfgFileSection='wsse',
233                                       cfg=self.cfgParser)
234   
235        # Make attribute certificate request
236        attCert = siteBClnt.getAttCert(userCert=userCertTxt,
237                                       userAttCert=userAttCert)
238        print "Attribute Certificate: \n\n:" + str(attCert)
239       
240        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
241        attCert.write()
242       
243       
244    def test8GetMappedAttCertStressTest(self):       
245        """test8GetMappedAttCertStressTest: Request mapped attribute
246        certificate from NDG Attribute Authority Web Service."""
247        _cfg = self.cfg['test8GetMappedAttCertStressTest']
248       
249        # Read user Certificate into a string ready for passing via WS
250        try:
251            userCertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
252            userCertTxt = open(userCertFilePath, 'r').read()
253       
254        except TypeError:
255            # No issuing cert set
256            userCertTxt = None
257               
258        except IOError, ioErr:
259            raise Exception("Error reading certificate file \"%s\": %s" % 
260                                    (ioErr.filename, ioErr.strerror))
261
262        # Make client to site B Attribute Authority
263        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
264                                       cfgFileSection='wsse',
265                                       cfg=self.cfgParser)
266
267        acFilePathList = [xpdVars(file) for file in \
268                          _cfg['userAttCertFilePathList'].split()]
269
270        for acFilePath in acFilePathList:
271            try:
272                userAttCert = AttCertRead(acFilePath)
273               
274            except IOError, ioErr:
275                raise Exception("Error reading attribute certificate file "
276                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
277       
278            # Make attribute certificate request
279            try:
280                attCert = siteBClnt.getAttCert(userCert=userCertTxt,
281                                               userAttCert=userAttCert)
282            except Exception, e:
283                outFilePfx = 'test8GetMappedAttCertStressTest-%s' % \
284                        os.path.basename(acFilePath)   
285                msgFile = open(outFilePfx+".msg", 'w')
286                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
287             
288             
289#_____________________________________________________________________________       
290class AttributeAuthorityClientTestSuite(unittest.TestSuite):
291    def __init__(self):
292        map = map(AttributeAuthorityClientTestCase,
293                  (
294                    "test1GetX509Cert",
295                    "test2GetHostInfo",
296                    "test3GetTrustedHostInfo",
297                    "test4GetTrustedHostInfoWithNoRole",
298                    "test5GetAttCert",
299                    "test6GetAttCertWithUserIdSet",
300                    "test7GetMappedAttCert",
301                    "test8GetMappedAttCertStressTest",
302                  ))
303        unittest.TestSuite.__init__(self, map)
304                                       
305if __name__ == "__main__":
306    unittest.main()
Note: See TracBrowser for help on using the repository browser.