source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py @ 4839

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py@4839
Revision 4839, 9.3 KB checked in by pjkersha, 11 years ago (diff)

Changed licence from Q Public to BSD

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import os
14import sys
15import getpass
16import re
17import logging
18logging.basicConfig()
19
20from os.path import expandvars as xpdVars
21from os.path import join as jnPath
22mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file)
23
24from ndg.security.test import BaseTestCase
25
26from ndg.security.common.utils.configfileparsers import \
27    CaseSensitiveConfigParser
28from ndg.security.server.attributeauthority import AttributeAuthority, \
29    AttributeAuthorityNoMatchingRoleInTrustedHosts
30
31from ndg.security.common.AttCert import AttCert
32
33
34class AttributeAuthorityTestCase(BaseTestCase):
35    clntPriKeyPwd = None
36
37    def setUp(self):
38        super(AttributeAuthorityTestCase, self).setUp()
39       
40        if 'NDGSEC_INT_DEBUG' in os.environ:
41            import pdb
42            pdb.set_trace()
43       
44        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
45            os.environ['NDGSEC_AA_UNITTEST_DIR'] = \
46                os.path.abspath(os.path.dirname(__file__))
47
48        self.cfgParser = CaseSensitiveConfigParser()
49        cfgFilePath = mkPath('test_attributeauthority.cfg')
50        self.cfgParser.read(cfgFilePath)
51       
52        self.cfg = {}
53        for section in self.cfgParser.sections() + ['DEFAULT']:
54            self.cfg[section] = dict(self.cfgParser.items(section))
55           
56        self.aa = AttributeAuthority(
57                                propFilePath=self.cfg['setUp']['propFilePath'])           
58
59    _mkSiteBAttributeAuthority = lambda self: AttributeAuthority(\
60                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath'])
61   
62    def test01GetHostInfo(self):
63        """test01GetHostInfo: retrieve info for AA host"""
64        hostInfo = self.aa.hostInfo
65        print("Host Info:\n %s" % hostInfo)     
66
67    def test02GetTrustedHostInfo(self):
68        """test02GetTrustedHostInfo: retrieve trusted host info matching a
69        given role"""
70        thisSection = self.cfg['test02GetTrustedHostInfo']
71       
72        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
73        for hostname, hostInfo in trustedHostInfo.items():
74            self.assert_(hostname, "Hostname not set")
75            for k, v in hostInfo.items():
76                self.assert_(k, "hostInfo value key unset")
77
78        print("Trusted Host Info:\n %s" % trustedHostInfo)
79
80    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
81        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
82        where the input role doesn't match any roles in the target AA's map
83        config file"""
84        thisSection=self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
85        try:
86            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
87            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
88           
89        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e:
90            print('PASSED - no match for role "%s": %s' % (thisSection['role'],
91                                                           e))
92
93
94    def test04GetTrustedHostInfoWithNoRole(self):
95        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
96        irrespective of role"""
97        trustedHostInfo = self.aa.getTrustedHostInfo()
98        for hostname, hostInfo in trustedHostInfo.items():
99            self.assert_(hostname, "Hostname not set")
100            for k, v in hostInfo.items():
101                self.assert_(k, "hostInfo value key unset")
102                   
103        print("Trusted Host Info:\n %s" % trustedHostInfo)
104
105    def test05GetAttCert(self):       
106        """test05GetAttCert: Request attribute certificate from NDG Attribute
107        Authority Web Service."""
108        thisSection = self.cfg['test05GetAttCert']
109       
110        # Read user Certificate into a string ready for passing via WS
111        try:
112            userX509CertFilePath = xpdVars(thisSection.get(
113                                                    'issuingClntCertFilePath'))
114            userX509CertTxt = open(userX509CertFilePath, 'r').read()
115       
116        except TypeError:
117            # No issuing cert set
118            userX509CertTxt = None
119               
120        except IOError, ioErr:
121            raise Exception("Error reading certificate file \"%s\": %s" %
122                                    (ioErr.filename, ioErr.strerror))
123
124        # Make attribute certificate request
125        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt)
126       
127        print("Attribute Certificate: \n\n:" + str(attCert))
128       
129        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
130        attCert.write()
131       
132       
133    def test06GetAttCertWithUserIdSet(self):       
134        """test06GetAttCertWithUserIdSet: Request attribute certificate from
135        NDG Attribute Authority Web Service setting a specific user Id
136        independent of the signer of the SOAP request."""
137        thisSection = self.cfg['test06GetAttCertWithUserIdSet']
138       
139        # Make attribute certificate request
140        userId = thisSection['userId']
141        attCert = self.aa.getAttCert(userId=userId)
142       
143        print("Attribute Certificate: \n\n:" + str(attCert))
144       
145        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
146        attCert.write()
147
148
149    def test07GetMappedAttCert(self):       
150        """test07GetMappedAttCert: Request mapped attribute certificate from
151        NDG Attribute Authority Web Service."""
152        thisSection = self.cfg['test07GetMappedAttCert']
153       
154        # Read user Certificate into a string ready for passing via WS
155        try:
156            userX509CertFilePath = xpdVars(thisSection.get(
157                                                    'issuingClntCertFilePath'))
158            userX509CertTxt = open(userX509CertFilePath, 'r').read()
159       
160        except TypeError:
161            # No issuing cert set
162            userX509CertTxt = None
163               
164        except IOError, ioErr:
165            raise Exception("Error reading certificate file \"%s\": %s" % 
166                                    (ioErr.filename, ioErr.strerror))
167   
168        # Simlarly for Attribute Certificate
169        try:
170            userAttCert = AttCert.Read(
171                                xpdVars(thisSection['userAttCertFilePath']))
172           
173        except IOError, ioErr:
174            raise Exception("Error reading attribute certificate file \"%s\": "
175                            "%s" % (ioErr.filename, ioErr.strerror))
176       
177        # Make client to site B Attribute Authority
178        siteBAA = self._mkSiteBAttributeAuthority()
179   
180        # Make attribute certificate request
181        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
182                                     userAttCert=userAttCert)
183        print("Attribute Certificate: \n\n:" + str(attCert))
184       
185        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath'])
186        attCert.write()
187       
188       
189    def test08GetMappedAttCertStressTest(self):       
190        """test08GetMappedAttCertStressTest: Request mapped attribute
191        certificate from NDG Attribute Authority Web Service."""
192        thisSection = self.cfg['test08GetMappedAttCertStressTest']
193       
194        # Read user Certificate into a string ready for passing via WS
195        try:
196            userX509CertFilePath = xpdVars(thisSection.get(
197                                                    'issuingClntCertFilePath'))
198            userX509CertTxt = open(userX509CertFilePath, 'r').read()
199       
200        except TypeError:
201            # No issuing cert set
202            userX509CertTxt = None
203               
204        except IOError, ioErr:
205            raise Exception("Error reading certificate file \"%s\": %s" % 
206                                    (ioErr.filename, ioErr.strerror))
207
208        # Make client to site B Attribute Authority
209        siteBAA = self._mkSiteBAttributeAuthority()
210
211        acFilePathList = [xpdVars(file) for file in \
212                          thisSection['userAttCertFilePathList'].split()]
213
214        passed = True
215        for acFilePath in acFilePathList:
216            try:
217                userAttCert = AttCert.Read(acFilePath)
218               
219            except IOError, ioErr:
220                raise Exception("Error reading attribute certificate file "
221                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
222       
223            # Make attribute certificate request
224            try:
225                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
226                                             userAttCert=userAttCert)
227            except Exception, e:
228                passed = True
229                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \
230                        os.path.basename(acFilePath)   
231                msgFile = open(outFilePfx+".msg", 'w')
232                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
233               
234        self.assert_(passed, 
235                     "At least one Attribute Certificate request failed.  "
236                     "Check the .msg files in this directory")
237                                       
238if __name__ == "__main__":
239    unittest.main()
Note: See TracBrowser for help on using the repository browser.