source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py @ 4713

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py@4713
Revision 4713, 12.0 KB checked in by pjkersha, 12 years ago (diff)

Added extra parameter 'siteName' to Attribute Authority map configuration to enable display of a more user friendly listing in the WAYF.

  • re-ran all attribute authority related unit tests
  • TODO: modify and test SSO client WAYF interface to use new param
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2008 STFC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
14
15import unittest
16import os, sys, getpass, re
17import logging
18logging.basicConfig()
19
20from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
21    NoMatchingRoleInTrustedHosts
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse, X509CertRead
24from ndg.security.common.utils.configfileparsers import \
25    CaseSensitiveConfigParser
26   
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
30
31
32class AttributeAuthorityClientTestCase(unittest.TestCase):
33    clntPriKeyPwd = None
34    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
35
36    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
37        '''Read proxy cert and user cert from a single PEM file and put in
38        a list ready for input into SignatureHandler'''               
39        proxyCertFileTxt = open(proxyCertFilePath).read()
40       
41        pemPatRE = re.compile(self.__class__.pemPat, re.S)
42        x509CertList = pemPatRE.findall(proxyCertFileTxt)
43       
44        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
45                            x509CertList]
46   
47        # Expecting proxy cert first - move this to the end.  This will
48        # be the cert used to verify the message signature
49        signingCertChain.reverse()
50       
51        return signingCertChain
52
53
54    def setUp(self):
55
56        if 'NDGSEC_INT_DEBUG' in os.environ:
57            import pdb
58            pdb.set_trace()
59       
60        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
61            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
62                os.path.abspath(os.path.dirname(__file__))
63
64        self.cfgParser = CaseSensitiveConfigParser()
65        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
66                             'attAuthorityClientTest.cfg')
67        self.cfgParser.read(cfgFilePath)
68       
69        self.cfg = {}
70        for section in self.cfgParser.sections():
71            self.cfg[section] = dict(self.cfgParser.items(section))
72
73        try:
74            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
75                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
76        except KeyError:
77            sslCACertList = []
78           
79        thisSection = self.cfg['setUp']
80       
81        # Instantiate WS proxy
82        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
83                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
84                                sslCACertList=sslCACertList,
85                                cfgFileSection='wsse',
86                                cfg=self.cfgParser)           
87
88    def test01GetHostInfo(self):
89        """test01GetHostInfo: retrieve info for AA host"""
90        hostInfo = self.siteAClnt.getHostInfo()
91        print "Host Info:\n %s" % hostInfo       
92
93    def test02GetTrustedHostInfo(self):
94        """test02GetTrustedHostInfo: retrieve trusted host info matching a
95        given role"""
96        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
97                                 self.cfg['test02GetTrustedHostInfo']['role'])
98        for hostname, hostInfo in trustedHostInfo.items():
99            self.assert_(hostname, "Hostname not set")
100            for k, v in hostInfo.items():
101                self.assert_(k, "hostInfo value key unset")
102
103        print "Trusted Host Info:\n %s" % trustedHostInfo
104
105    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
106        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
107        where the input role doesn't match any roles in the target AA's map
108        config file"""
109        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
110        try:
111            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
112            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
113           
114        except NoMatchingRoleInTrustedHosts, e:
115            print 'As expected - no match for role "%s": %s' % \
116                (_cfg['role'], e)
117
118
119    def test04GetTrustedHostInfoWithNoRole(self):
120        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
121        irrespective of role"""
122        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
123        for hostname, hostInfo in trustedHostInfo.items():
124            self.assert_(hostname, "Hostname not set")
125            for k, v in hostInfo.items():
126                self.assert_(k, "hostInfo value key unset")
127                   
128        print "Trusted Host Info:\n %s" % trustedHostInfo
129       
130
131    def test05GetAllHostsInfo(self):
132        """test05GetAllHostsInfo: retrieve info for all hosts"""
133        allHostInfo = self.siteAClnt.getAllHostsInfo()
134        for hostname, hostInfo in allHostInfo.items():
135            self.assert_(hostname, "Hostname not set")
136            for k, v in hostInfo.items():
137                self.assert_(k, "hostInfo value key unset")
138                   
139        print "All Hosts Info:\n %s" % allHostInfo
140
141
142    def test06GetAttCert(self):       
143        """test06GetAttCert: Request attribute certificate from NDG Attribute
144        Authority Web Service."""
145        _cfg = self.cfg['test06GetAttCert']
146       
147        # Read user Certificate into a string ready for passing via WS
148        try:
149            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
150            userX509CertTxt = open(userX509CertFilePath, 'r').read()
151       
152        except TypeError:
153            # No issuing cert set
154            userX509CertTxt = None
155               
156        except IOError, ioErr:
157            raise Exception("Error reading certificate file \"%s\": %s" % \
158                                    (ioErr.filename, ioErr.strerror))
159
160        # Make attribute certificate request
161        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
162       
163        print "Attribute Certificate: \n\n:" + str(attCert)
164       
165        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
166        attCert.write()
167       
168       
169    def test07GetAttCertWithUserIdSet(self):       
170        """test07GetAttCertWithUserIdSet: Request attribute certificate from
171        NDG Attribute Authority Web Service setting a specific user Id
172        independent of the signer of the SOAP request."""
173        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
174       
175        # Read user Certificate into a string ready for passing via WS
176        try:
177            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
178            userX509CertTxt = open(userX509CertFilePath, 'r').read()
179       
180        except TypeError:
181            # No issuing cert set
182            userX509CertTxt = None
183               
184        except IOError, ioErr:
185            raise Exception("Error reading certificate file \"%s\": %s" % \
186                                    (ioErr.filename, ioErr.strerror))
187
188        # Make attribute certificate request
189        userId = _cfg['userId']
190        attCert = self.siteAClnt.getAttCert(userId=userId,
191                                            userX509Cert=userX509CertTxt)
192       
193        print "Attribute Certificate: \n\n:" + str(attCert)
194       
195        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
196        attCert.write()
197
198
199    def test08GetMappedAttCert(self):       
200        """test08GetMappedAttCert: Request mapped attribute certificate from
201        NDG Attribute Authority Web Service."""
202        _cfg = self.cfg['test08GetMappedAttCert']
203       
204        # Read user Certificate into a string ready for passing via WS
205        try:
206            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
207            userX509CertTxt = open(userX509CertFilePath, 'r').read()
208       
209        except TypeError:
210            # No issuing cert set
211            userX509CertTxt = None
212               
213        except IOError, ioErr:
214            raise Exception("Error reading certificate file \"%s\": %s" % \
215                                    (ioErr.filename, ioErr.strerror))
216   
217        # Simlarly for Attribute Certificate
218        try:
219            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
220           
221        except IOError, ioErr:
222            raise Exception("Error reading attribute certificate file \"%s\": "
223                            "%s" % (ioErr.filename, ioErr.strerror))
224       
225        # Make client to site B Attribute Authority
226        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
227                                       cfgFileSection='wsse',
228                                       cfg=self.cfgParser)
229   
230        # Make attribute certificate request
231        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
232                                       userAttCert=userAttCert)
233        print "Attribute Certificate: \n\n:" + str(attCert)
234       
235        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
236        attCert.write()
237       
238       
239    def test09GetMappedAttCertStressTest(self):       
240        """test09GetMappedAttCertStressTest: Request mapped attribute
241        certificate from NDG Attribute Authority Web Service."""
242        _cfg = self.cfg['test09GetMappedAttCertStressTest']
243       
244        # Read user Certificate into a string ready for passing via WS
245        try:
246            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
247            userX509CertTxt = open(userX509CertFilePath, 'r').read()
248       
249        except TypeError:
250            # No issuing cert set
251            userX509CertTxt = None
252               
253        except IOError, ioErr:
254            raise Exception("Error reading certificate file \"%s\": %s" % 
255                                    (ioErr.filename, ioErr.strerror))
256
257        # Make client to site B Attribute Authority
258        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
259                                       cfgFileSection='wsse',
260                                       cfg=self.cfgParser)
261
262        acFilePathList = [xpdVars(file) for file in \
263                          _cfg['userAttCertFilePathList'].split()]
264
265        for acFilePath in acFilePathList:
266            try:
267                userAttCert = AttCertRead(acFilePath)
268               
269            except IOError, ioErr:
270                raise Exception("Error reading attribute certificate file "
271                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
272       
273            # Make attribute certificate request
274            try:
275                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
276                                               userAttCert=userAttCert)
277            except Exception, e:
278                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
279                        os.path.basename(acFilePath)   
280                msgFile = open(outFilePfx+".msg", 'w')
281                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
282             
283             
284class AttributeAuthorityClientTestSuite(unittest.TestSuite):
285    def __init__(self):
286        map = map(AttributeAuthorityClientTestCase,
287                  (
288                    "test01GetHostInfo",
289                    "test02GetTrustedHostInfo",
290                    "test03GetTrustedHostInfoWithNoMatchingRoleFound",
291                    "test04GetTrustedHostInfoWithNoRole",
292                    "test05GetAllHostsInfo",
293                    "test06GetAttCert",
294                    "test07GetAttCertWithUserIdSet",
295                    "test08GetMappedAttCert",
296                    "test09GetMappedAttCertStressTest",
297                  ))
298        unittest.TestSuite.__init__(self, map)
299                                       
300if __name__ == "__main__":
301    unittest.main()
Note: See TracBrowser for help on using the repository browser.