source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py @ 4680

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py@4680
Revision 4680, 9.1 KB checked in by pjkersha, 12 years ago (diff)

Global replace to fix copyright from STFC & NERC to STFC alone because it's not possible to have copyright held by two orgs.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2008 STFC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os
17import sys
18import getpass
19import re
20import logging
21logging.basicConfig()
22
23from os.path import expandvars as xpdVars
24from os.path import join as jnPath
25mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file)
26
27from ndg.security.common.utils.configfileparsers import \
28    CaseSensitiveConfigParser
29from ndg.security.server.attributeauthority import AttributeAuthority, \
30    AttributeAuthorityNoMatchingRoleInTrustedHosts
31
32from ndg.security.common.AttCert import AttCert
33
34
35class AttributeAuthorityTestCase(unittest.TestCase):
36    clntPriKeyPwd = None
37
38    def setUp(self):
39
40        if 'NDGSEC_INT_DEBUG' in os.environ:
41            import pdb
42            pdb.set_trace()
43       
44        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
45            os.environ['NDGSEC_AA_UNITTEST_DIR'] = \
46                os.path.abspath(os.path.dirname(__file__))
47
48        self.cfgParser = CaseSensitiveConfigParser()
49        cfgFilePath = mkPath('test_attributeauthority.cfg')
50        self.cfgParser.read(cfgFilePath)
51       
52        self.cfg = {}
53        for section in self.cfgParser.sections() + ['DEFAULT']:
54            self.cfg[section] = dict(self.cfgParser.items(section))
55           
56        self.aa = AttributeAuthority(
57                                propFilePath=self.cfg['setUp']['propFilePath'])           
58
59    _mkSiteBAttributeAuthority = lambda self: AttributeAuthority(\
60                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath'])
61   
62    def test01GetHostInfo(self):
63        """test01GetHostInfo: retrieve info for AA host"""
64        hostInfo = self.aa.hostInfo
65        print("Host Info:\n %s" % hostInfo)     
66
67    def test02GetTrustedHostInfo(self):
68        """test02GetTrustedHostInfo: retrieve trusted host info matching a
69        given role"""
70        thisSection = self.cfg['test02GetTrustedHostInfo']
71       
72        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
73        for hostname, hostInfo in trustedHostInfo.items():
74            self.assert_(hostname, "Hostname not set")
75            for k, v in hostInfo.items():
76                self.assert_(k, "hostInfo value key unset")
77
78        print("Trusted Host Info:\n %s" % trustedHostInfo)
79
80    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
81        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
82        where the input role doesn't match any roles in the target AA's map
83        config file"""
84        thisSection=self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
85        try:
86            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
87            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
88           
89        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e:
90            print('PASSED - no match for role "%s": %s' % (thisSection['role'],
91                                                           e))
92
93
94    def test04GetTrustedHostInfoWithNoRole(self):
95        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
96        irrespective of role"""
97        trustedHostInfo = self.aa.getTrustedHostInfo()
98        for hostname, hostInfo in trustedHostInfo.items():
99            self.assert_(hostname, "Hostname not set")
100            for k, v in hostInfo.items():
101                self.assert_(k, "hostInfo value key unset")
102                self.assert_(v, "%s value not set" % k)
103                   
104        print("Trusted Host Info:\n %s" % trustedHostInfo)
105
106    def test05GetAttCert(self):       
107        """test05GetAttCert: Request attribute certificate from NDG Attribute
108        Authority Web Service."""
109        thisSection = self.cfg['test05GetAttCert']
110       
111        # Read user Certificate into a string ready for passing via WS
112        try:
113            userX509CertFilePath = xpdVars(thisSection.get(
114                                                    'issuingClntCertFilePath'))
115            userX509CertTxt = open(userX509CertFilePath, 'r').read()
116       
117        except TypeError:
118            # No issuing cert set
119            userX509CertTxt = None
120               
121        except IOError, ioErr:
122            raise Exception("Error reading certificate file \"%s\": %s" %
123                                    (ioErr.filename, ioErr.strerror))
124
125        # Make attribute certificate request
126        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt)
127       
128        print("Attribute Certificate: \n\n:" + str(attCert))
129       
130        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
131        attCert.write()
132       
133       
134    def test06GetAttCertWithUserIdSet(self):       
135        """test06GetAttCertWithUserIdSet: Request attribute certificate from
136        NDG Attribute Authority Web Service setting a specific user Id
137        independent of the signer of the SOAP request."""
138        thisSection = self.cfg['test06GetAttCertWithUserIdSet']
139       
140        # Make attribute certificate request
141        userId = thisSection['userId']
142        attCert = self.aa.getAttCert(userId=userId)
143       
144        print("Attribute Certificate: \n\n:" + str(attCert))
145       
146        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
147        attCert.write()
148
149
150    def test07GetMappedAttCert(self):       
151        """test07GetMappedAttCert: Request mapped attribute certificate from
152        NDG Attribute Authority Web Service."""
153        thisSection = self.cfg['test07GetMappedAttCert']
154       
155        # Read user Certificate into a string ready for passing via WS
156        try:
157            userX509CertFilePath = xpdVars(thisSection.get(
158                                                    'issuingClntCertFilePath'))
159            userX509CertTxt = open(userX509CertFilePath, 'r').read()
160       
161        except TypeError:
162            # No issuing cert set
163            userX509CertTxt = None
164               
165        except IOError, ioErr:
166            raise Exception("Error reading certificate file \"%s\": %s" % 
167                                    (ioErr.filename, ioErr.strerror))
168   
169        # Simlarly for Attribute Certificate
170        try:
171            userAttCert = AttCert.Read(
172                                xpdVars(thisSection['userAttCertFilePath']))
173           
174        except IOError, ioErr:
175            raise Exception("Error reading attribute certificate file \"%s\": "
176                            "%s" % (ioErr.filename, ioErr.strerror))
177       
178        # Make client to site B Attribute Authority
179        siteBAA = self._mkSiteBAttributeAuthority()
180   
181        # Make attribute certificate request
182        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
183                                     userAttCert=userAttCert)
184        print("Attribute Certificate: \n\n:" + str(attCert))
185       
186        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath'])
187        attCert.write()
188       
189       
190    def test08GetMappedAttCertStressTest(self):       
191        """test08GetMappedAttCertStressTest: Request mapped attribute
192        certificate from NDG Attribute Authority Web Service."""
193        thisSection = self.cfg['test08GetMappedAttCertStressTest']
194       
195        # Read user Certificate into a string ready for passing via WS
196        try:
197            userX509CertFilePath = xpdVars(thisSection.get(
198                                                    'issuingClntCertFilePath'))
199            userX509CertTxt = open(userX509CertFilePath, 'r').read()
200       
201        except TypeError:
202            # No issuing cert set
203            userX509CertTxt = None
204               
205        except IOError, ioErr:
206            raise Exception("Error reading certificate file \"%s\": %s" % 
207                                    (ioErr.filename, ioErr.strerror))
208
209        # Make client to site B Attribute Authority
210        siteBAA = self._mkSiteBAttributeAuthority()
211
212        acFilePathList = [xpdVars(file) for file in \
213                          thisSection['userAttCertFilePathList'].split()]
214
215        for acFilePath in acFilePathList:
216            try:
217                userAttCert = AttCert.Read(acFilePath)
218               
219            except IOError, ioErr:
220                raise Exception("Error reading attribute certificate file "
221                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
222       
223            # Make attribute certificate request
224            try:
225                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
226                                             userAttCert=userAttCert)
227            except Exception, e:
228                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \
229                        os.path.basename(acFilePath)   
230                msgFile = open(outFilePfx+".msg", 'w')
231                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
232                                       
233if __name__ == "__main__":
234    unittest.main()
Note: See TracBrowser for help on using the repository browser.