source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py @ 4840

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py@5053
Revision 4840, 12.1 KB checked in by pjkersha, 11 years ago (diff)

Fix problem with search and replace licence not adding a new line.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12
13import unittest
14import os, sys, getpass, re
15import logging
16logging.basicConfig()
17
18from ndg.security.test import BaseTestCase
19
20from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
21    NoMatchingRoleInTrustedHosts
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse, X509CertRead
24from ndg.security.common.utils.configfileparsers import \
25    CaseSensitiveConfigParser
26   
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
30
31
32class AttributeAuthorityClientTestCase(BaseTestCase):
33    clntPriKeyPwd = None
34    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
35
36    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
37        '''Read proxy cert and user cert from a single PEM file and put in
38        a list ready for input into SignatureHandler'''               
39        proxyCertFileTxt = open(proxyCertFilePath).read()
40       
41        pemPatRE = re.compile(self.__class__.pemPat, re.S)
42        x509CertList = pemPatRE.findall(proxyCertFileTxt)
43       
44        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
45                            x509CertList]
46   
47        # Expecting proxy cert first - move this to the end.  This will
48        # be the cert used to verify the message signature
49        signingCertChain.reverse()
50       
51        return signingCertChain
52
53
54    def setUp(self):
55        super(AttributeAuthorityClientTestCase, self).setUp()
56       
57        if 'NDGSEC_INT_DEBUG' in os.environ:
58            import pdb
59            pdb.set_trace()
60       
61        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
62            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
63                os.path.abspath(os.path.dirname(__file__))
64
65        self.cfgParser = CaseSensitiveConfigParser()
66        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
67                             'attAuthorityClientTest.cfg')
68        self.cfgParser.read(cfgFilePath)
69       
70        self.cfg = {}
71        for section in self.cfgParser.sections():
72            self.cfg[section] = dict(self.cfgParser.items(section))
73
74        try:
75            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
76                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
77        except KeyError:
78            sslCACertList = []
79           
80        thisSection = self.cfg['setUp']
81       
82        # Instantiate WS proxy
83        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
84                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
85                                sslCACertList=sslCACertList,
86                                cfgFileSection='wsse',
87                                cfg=self.cfgParser)           
88
89    def test01GetHostInfo(self):
90        """test01GetHostInfo: retrieve info for AA host"""
91        hostInfo = self.siteAClnt.getHostInfo()
92        print "Host Info:\n %s" % hostInfo       
93
94    def test02GetTrustedHostInfo(self):
95        """test02GetTrustedHostInfo: retrieve trusted host info matching a
96        given role"""
97        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
98                                 self.cfg['test02GetTrustedHostInfo']['role'])
99        for hostname, hostInfo in trustedHostInfo.items():
100            self.assert_(hostname, "Hostname not set")
101            for k, v in hostInfo.items():
102                self.assert_(k, "hostInfo value key unset")
103
104        print "Trusted Host Info:\n %s" % trustedHostInfo
105
106    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
107        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
108        where the input role doesn't match any roles in the target AA's map
109        config file"""
110        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
111        try:
112            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
113            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
114           
115        except NoMatchingRoleInTrustedHosts, e:
116            print 'As expected - no match for role "%s": %s' % \
117                (_cfg['role'], e)
118
119
120    def test04GetTrustedHostInfoWithNoRole(self):
121        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
122        irrespective of role"""
123        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
124        for hostname, hostInfo in trustedHostInfo.items():
125            self.assert_(hostname, "Hostname not set")
126            for k, v in hostInfo.items():
127                self.assert_(k, "hostInfo value key unset")
128                   
129        print "Trusted Host Info:\n %s" % trustedHostInfo
130       
131
132    def test05GetAllHostsInfo(self):
133        """test05GetAllHostsInfo: retrieve info for all hosts"""
134        allHostInfo = self.siteAClnt.getAllHostsInfo()
135        for hostname, hostInfo in allHostInfo.items():
136            self.assert_(hostname, "Hostname not set")
137            for k, v in hostInfo.items():
138                self.assert_(k, "hostInfo value key unset")
139                   
140        print "All Hosts Info:\n %s" % allHostInfo
141
142
143    def test06GetAttCert(self):       
144        """test06GetAttCert: Request attribute certificate from NDG Attribute
145        Authority Web Service."""
146        _cfg = self.cfg['test06GetAttCert']
147       
148        # Read user Certificate into a string ready for passing via WS
149        try:
150            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
151            userX509CertTxt = open(userX509CertFilePath, 'r').read()
152       
153        except TypeError:
154            # No issuing cert set
155            userX509CertTxt = None
156               
157        except IOError, ioErr:
158            raise Exception("Error reading certificate file \"%s\": %s" % \
159                                    (ioErr.filename, ioErr.strerror))
160
161        # Make attribute certificate request
162        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
163       
164        print "Attribute Certificate: \n\n:" + str(attCert)
165       
166        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
167        attCert.write()
168       
169       
170    def test07GetAttCertWithUserIdSet(self):       
171        """test07GetAttCertWithUserIdSet: Request attribute certificate from
172        NDG Attribute Authority Web Service setting a specific user Id
173        independent of the signer of the SOAP request."""
174        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
175       
176        # Read user Certificate into a string ready for passing via WS
177        try:
178            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
179            userX509CertTxt = open(userX509CertFilePath, 'r').read()
180       
181        except TypeError:
182            # No issuing cert set
183            userX509CertTxt = None
184               
185        except IOError, ioErr:
186            raise Exception("Error reading certificate file \"%s\": %s" % \
187                                    (ioErr.filename, ioErr.strerror))
188
189        # Make attribute certificate request
190        userId = _cfg['userId']
191        attCert = self.siteAClnt.getAttCert(userId=userId,
192                                            userX509Cert=userX509CertTxt)
193       
194        print "Attribute Certificate: \n\n:" + str(attCert)
195       
196        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
197        attCert.write()
198
199
200    def test08GetMappedAttCert(self):       
201        """test08GetMappedAttCert: Request mapped attribute certificate from
202        NDG Attribute Authority Web Service."""
203        _cfg = self.cfg['test08GetMappedAttCert']
204       
205        # Read user Certificate into a string ready for passing via WS
206        try:
207            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
208            userX509CertTxt = open(userX509CertFilePath, 'r').read()
209       
210        except TypeError:
211            # No issuing cert set
212            userX509CertTxt = None
213               
214        except IOError, ioErr:
215            raise Exception("Error reading certificate file \"%s\": %s" % \
216                                    (ioErr.filename, ioErr.strerror))
217   
218        # Simlarly for Attribute Certificate
219        try:
220            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
221           
222        except IOError, ioErr:
223            raise Exception("Error reading attribute certificate file \"%s\": "
224                            "%s" % (ioErr.filename, ioErr.strerror))
225       
226        # Make client to site B Attribute Authority
227        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
228                                       cfgFileSection='wsse',
229                                       cfg=self.cfgParser)
230   
231        # Make attribute certificate request
232        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
233                                       userAttCert=userAttCert)
234        print "Attribute Certificate: \n\n:" + str(attCert)
235       
236        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
237        attCert.write()
238       
239       
240    def test09GetMappedAttCertStressTest(self):       
241        """test09GetMappedAttCertStressTest: Request mapped attribute
242        certificate from NDG Attribute Authority Web Service."""
243        _cfg = self.cfg['test09GetMappedAttCertStressTest']
244       
245        # Read user Certificate into a string ready for passing via WS
246        try:
247            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
248            userX509CertTxt = open(userX509CertFilePath, 'r').read()
249       
250        except TypeError:
251            # No issuing cert set
252            userX509CertTxt = None
253               
254        except IOError, ioErr:
255            raise Exception("Error reading certificate file \"%s\": %s" % 
256                                    (ioErr.filename, ioErr.strerror))
257
258        # Make client to site B Attribute Authority
259        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
260                                       cfgFileSection='wsse',
261                                       cfg=self.cfgParser)
262
263        acFilePathList = [xpdVars(file) for file in \
264                          _cfg['userAttCertFilePathList'].split()]
265
266        for acFilePath in acFilePathList:
267            try:
268                userAttCert = AttCertRead(acFilePath)
269               
270            except IOError, ioErr:
271                raise Exception("Error reading attribute certificate file "
272                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
273       
274            # Make attribute certificate request
275            try:
276                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
277                                               userAttCert=userAttCert)
278            except Exception, e:
279                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
280                        os.path.basename(acFilePath)   
281                msgFile = open(outFilePfx+".msg", 'w')
282                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
283             
284             
285class AttributeAuthorityClientTestSuite(unittest.TestSuite):
286    def __init__(self):
287        map = map(AttributeAuthorityClientTestCase,
288                  (
289                    "test01GetHostInfo",
290                    "test02GetTrustedHostInfo",
291                    "test03GetTrustedHostInfoWithNoMatchingRoleFound",
292                    "test04GetTrustedHostInfoWithNoRole",
293                    "test05GetAllHostsInfo",
294                    "test06GetAttCert",
295                    "test07GetAttCertWithUserIdSet",
296                    "test08GetMappedAttCert",
297                    "test09GetMappedAttCertStressTest",
298                  ))
299        unittest.TestSuite.__init__(self, map)
300                                       
301if __name__ == "__main__":
302    unittest.main()
Note: See TracBrowser for help on using the repository browser.