source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py @ 4716

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py@4716
Revision 4716, 12.1 KB checked in by pjkersha, 12 years ago (diff)

renamed data -> config

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2008 STFC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
14
15import unittest
16import os, sys, getpass, re
17import logging
18logging.basicConfig()
19
20from ndg.security.test import BaseTestCase
21
22from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
23    NoMatchingRoleInTrustedHosts
24from ndg.security.common.AttCert import AttCertRead
25from ndg.security.common.X509 import X509CertParse, X509CertRead
26from ndg.security.common.utils.configfileparsers import \
27    CaseSensitiveConfigParser
28   
29from os.path import expandvars as xpdVars
30from os.path import join as jnPath
31mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
32
33
34class AttributeAuthorityClientTestCase(BaseTestCase):
35    clntPriKeyPwd = None
36    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
37
38    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
39        '''Read proxy cert and user cert from a single PEM file and put in
40        a list ready for input into SignatureHandler'''               
41        proxyCertFileTxt = open(proxyCertFilePath).read()
42       
43        pemPatRE = re.compile(self.__class__.pemPat, re.S)
44        x509CertList = pemPatRE.findall(proxyCertFileTxt)
45       
46        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
47                            x509CertList]
48   
49        # Expecting proxy cert first - move this to the end.  This will
50        # be the cert used to verify the message signature
51        signingCertChain.reverse()
52       
53        return signingCertChain
54
55
56    def setUp(self):
57        super(AttributeAuthorityClientTestCase, self).setUp()
58       
59        if 'NDGSEC_INT_DEBUG' in os.environ:
60            import pdb
61            pdb.set_trace()
62       
63        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
64            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
65                os.path.abspath(os.path.dirname(__file__))
66
67        self.cfgParser = CaseSensitiveConfigParser()
68        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
69                             'attAuthorityClientTest.cfg')
70        self.cfgParser.read(cfgFilePath)
71       
72        self.cfg = {}
73        for section in self.cfgParser.sections():
74            self.cfg[section] = dict(self.cfgParser.items(section))
75
76        try:
77            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
78                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
79        except KeyError:
80            sslCACertList = []
81           
82        thisSection = self.cfg['setUp']
83       
84        # Instantiate WS proxy
85        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
86                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
87                                sslCACertList=sslCACertList,
88                                cfgFileSection='wsse',
89                                cfg=self.cfgParser)           
90
91    def test01GetHostInfo(self):
92        """test01GetHostInfo: retrieve info for AA host"""
93        hostInfo = self.siteAClnt.getHostInfo()
94        print "Host Info:\n %s" % hostInfo       
95
96    def test02GetTrustedHostInfo(self):
97        """test02GetTrustedHostInfo: retrieve trusted host info matching a
98        given role"""
99        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
100                                 self.cfg['test02GetTrustedHostInfo']['role'])
101        for hostname, hostInfo in trustedHostInfo.items():
102            self.assert_(hostname, "Hostname not set")
103            for k, v in hostInfo.items():
104                self.assert_(k, "hostInfo value key unset")
105
106        print "Trusted Host Info:\n %s" % trustedHostInfo
107
108    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
109        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
110        where the input role doesn't match any roles in the target AA's map
111        config file"""
112        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
113        try:
114            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
115            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
116           
117        except NoMatchingRoleInTrustedHosts, e:
118            print 'As expected - no match for role "%s": %s' % \
119                (_cfg['role'], e)
120
121
122    def test04GetTrustedHostInfoWithNoRole(self):
123        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
124        irrespective of role"""
125        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
126        for hostname, hostInfo in trustedHostInfo.items():
127            self.assert_(hostname, "Hostname not set")
128            for k, v in hostInfo.items():
129                self.assert_(k, "hostInfo value key unset")
130                   
131        print "Trusted Host Info:\n %s" % trustedHostInfo
132       
133
134    def test05GetAllHostsInfo(self):
135        """test05GetAllHostsInfo: retrieve info for all hosts"""
136        allHostInfo = self.siteAClnt.getAllHostsInfo()
137        for hostname, hostInfo in allHostInfo.items():
138            self.assert_(hostname, "Hostname not set")
139            for k, v in hostInfo.items():
140                self.assert_(k, "hostInfo value key unset")
141                   
142        print "All Hosts Info:\n %s" % allHostInfo
143
144
145    def test06GetAttCert(self):       
146        """test06GetAttCert: Request attribute certificate from NDG Attribute
147        Authority Web Service."""
148        _cfg = self.cfg['test06GetAttCert']
149       
150        # Read user Certificate into a string ready for passing via WS
151        try:
152            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
153            userX509CertTxt = open(userX509CertFilePath, 'r').read()
154       
155        except TypeError:
156            # No issuing cert set
157            userX509CertTxt = None
158               
159        except IOError, ioErr:
160            raise Exception("Error reading certificate file \"%s\": %s" % \
161                                    (ioErr.filename, ioErr.strerror))
162
163        # Make attribute certificate request
164        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
165       
166        print "Attribute Certificate: \n\n:" + str(attCert)
167       
168        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
169        attCert.write()
170       
171       
172    def test07GetAttCertWithUserIdSet(self):       
173        """test07GetAttCertWithUserIdSet: Request attribute certificate from
174        NDG Attribute Authority Web Service setting a specific user Id
175        independent of the signer of the SOAP request."""
176        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
177       
178        # Read user Certificate into a string ready for passing via WS
179        try:
180            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
181            userX509CertTxt = open(userX509CertFilePath, 'r').read()
182       
183        except TypeError:
184            # No issuing cert set
185            userX509CertTxt = None
186               
187        except IOError, ioErr:
188            raise Exception("Error reading certificate file \"%s\": %s" % \
189                                    (ioErr.filename, ioErr.strerror))
190
191        # Make attribute certificate request
192        userId = _cfg['userId']
193        attCert = self.siteAClnt.getAttCert(userId=userId,
194                                            userX509Cert=userX509CertTxt)
195       
196        print "Attribute Certificate: \n\n:" + str(attCert)
197       
198        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
199        attCert.write()
200
201
202    def test08GetMappedAttCert(self):       
203        """test08GetMappedAttCert: Request mapped attribute certificate from
204        NDG Attribute Authority Web Service."""
205        _cfg = self.cfg['test08GetMappedAttCert']
206       
207        # Read user Certificate into a string ready for passing via WS
208        try:
209            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
210            userX509CertTxt = open(userX509CertFilePath, 'r').read()
211       
212        except TypeError:
213            # No issuing cert set
214            userX509CertTxt = None
215               
216        except IOError, ioErr:
217            raise Exception("Error reading certificate file \"%s\": %s" % \
218                                    (ioErr.filename, ioErr.strerror))
219   
220        # Simlarly for Attribute Certificate
221        try:
222            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
223           
224        except IOError, ioErr:
225            raise Exception("Error reading attribute certificate file \"%s\": "
226                            "%s" % (ioErr.filename, ioErr.strerror))
227       
228        # Make client to site B Attribute Authority
229        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
230                                       cfgFileSection='wsse',
231                                       cfg=self.cfgParser)
232   
233        # Make attribute certificate request
234        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
235                                       userAttCert=userAttCert)
236        print "Attribute Certificate: \n\n:" + str(attCert)
237       
238        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
239        attCert.write()
240       
241       
242    def test09GetMappedAttCertStressTest(self):       
243        """test09GetMappedAttCertStressTest: Request mapped attribute
244        certificate from NDG Attribute Authority Web Service."""
245        _cfg = self.cfg['test09GetMappedAttCertStressTest']
246       
247        # Read user Certificate into a string ready for passing via WS
248        try:
249            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
250            userX509CertTxt = open(userX509CertFilePath, 'r').read()
251       
252        except TypeError:
253            # No issuing cert set
254            userX509CertTxt = None
255               
256        except IOError, ioErr:
257            raise Exception("Error reading certificate file \"%s\": %s" % 
258                                    (ioErr.filename, ioErr.strerror))
259
260        # Make client to site B Attribute Authority
261        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
262                                       cfgFileSection='wsse',
263                                       cfg=self.cfgParser)
264
265        acFilePathList = [xpdVars(file) for file in \
266                          _cfg['userAttCertFilePathList'].split()]
267
268        for acFilePath in acFilePathList:
269            try:
270                userAttCert = AttCertRead(acFilePath)
271               
272            except IOError, ioErr:
273                raise Exception("Error reading attribute certificate file "
274                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
275       
276            # Make attribute certificate request
277            try:
278                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
279                                               userAttCert=userAttCert)
280            except Exception, e:
281                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
282                        os.path.basename(acFilePath)   
283                msgFile = open(outFilePfx+".msg", 'w')
284                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
285             
286             
287class AttributeAuthorityClientTestSuite(unittest.TestSuite):
288    def __init__(self):
289        map = map(AttributeAuthorityClientTestCase,
290                  (
291                    "test01GetHostInfo",
292                    "test02GetTrustedHostInfo",
293                    "test03GetTrustedHostInfoWithNoMatchingRoleFound",
294                    "test04GetTrustedHostInfoWithNoRole",
295                    "test05GetAllHostsInfo",
296                    "test06GetAttCert",
297                    "test07GetAttCertWithUserIdSet",
298                    "test08GetMappedAttCert",
299                    "test09GetMappedAttCertStressTest",
300                  ))
301        unittest.TestSuite.__init__(self, map)
302                                       
303if __name__ == "__main__":
304    unittest.main()
Note: See TracBrowser for help on using the repository browser.