source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py @ 4716

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py@4716
Revision 4716, 9.1 KB checked in by pjkersha, 12 years ago (diff)

renamed data -> config

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2008 STFC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os
17import sys
18import getpass
19import re
20import logging
21logging.basicConfig()
22
23from os.path import expandvars as xpdVars
24from os.path import join as jnPath
25mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file)
26
27from ndg.security.test import BaseTestCase
28
29from ndg.security.common.utils.configfileparsers import \
30    CaseSensitiveConfigParser
31from ndg.security.server.attributeauthority import AttributeAuthority, \
32    AttributeAuthorityNoMatchingRoleInTrustedHosts
33
34from ndg.security.common.AttCert import AttCert
35
36
37class AttributeAuthorityTestCase(BaseTestCase):
38    clntPriKeyPwd = None
39
40    def setUp(self):
41        super(AttributeAuthorityTestCase, self).setUp()
42       
43        if 'NDGSEC_INT_DEBUG' in os.environ:
44            import pdb
45            pdb.set_trace()
46       
47        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
48            os.environ['NDGSEC_AA_UNITTEST_DIR'] = \
49                os.path.abspath(os.path.dirname(__file__))
50
51        self.cfgParser = CaseSensitiveConfigParser()
52        cfgFilePath = mkPath('test_attributeauthority.cfg')
53        self.cfgParser.read(cfgFilePath)
54       
55        self.cfg = {}
56        for section in self.cfgParser.sections() + ['DEFAULT']:
57            self.cfg[section] = dict(self.cfgParser.items(section))
58           
59        self.aa = AttributeAuthority(
60                                propFilePath=self.cfg['setUp']['propFilePath'])           
61
62    _mkSiteBAttributeAuthority = lambda self: AttributeAuthority(\
63                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath'])
64   
65    def test01GetHostInfo(self):
66        """test01GetHostInfo: retrieve info for AA host"""
67        hostInfo = self.aa.hostInfo
68        print("Host Info:\n %s" % hostInfo)     
69
70    def test02GetTrustedHostInfo(self):
71        """test02GetTrustedHostInfo: retrieve trusted host info matching a
72        given role"""
73        thisSection = self.cfg['test02GetTrustedHostInfo']
74       
75        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
76        for hostname, hostInfo in trustedHostInfo.items():
77            self.assert_(hostname, "Hostname not set")
78            for k, v in hostInfo.items():
79                self.assert_(k, "hostInfo value key unset")
80
81        print("Trusted Host Info:\n %s" % trustedHostInfo)
82
83    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
84        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
85        where the input role doesn't match any roles in the target AA's map
86        config file"""
87        thisSection=self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
88        try:
89            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
90            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
91           
92        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e:
93            print('PASSED - no match for role "%s": %s' % (thisSection['role'],
94                                                           e))
95
96
97    def test04GetTrustedHostInfoWithNoRole(self):
98        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
99        irrespective of role"""
100        trustedHostInfo = self.aa.getTrustedHostInfo()
101        for hostname, hostInfo in trustedHostInfo.items():
102            self.assert_(hostname, "Hostname not set")
103            for k, v in hostInfo.items():
104                self.assert_(k, "hostInfo value key unset")
105                   
106        print("Trusted Host Info:\n %s" % trustedHostInfo)
107
108    def test05GetAttCert(self):       
109        """test05GetAttCert: Request attribute certificate from NDG Attribute
110        Authority Web Service."""
111        thisSection = self.cfg['test05GetAttCert']
112       
113        # Read user Certificate into a string ready for passing via WS
114        try:
115            userX509CertFilePath = xpdVars(thisSection.get(
116                                                    'issuingClntCertFilePath'))
117            userX509CertTxt = open(userX509CertFilePath, 'r').read()
118       
119        except TypeError:
120            # No issuing cert set
121            userX509CertTxt = None
122               
123        except IOError, ioErr:
124            raise Exception("Error reading certificate file \"%s\": %s" %
125                                    (ioErr.filename, ioErr.strerror))
126
127        # Make attribute certificate request
128        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt)
129       
130        print("Attribute Certificate: \n\n:" + str(attCert))
131       
132        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
133        attCert.write()
134       
135       
136    def test06GetAttCertWithUserIdSet(self):       
137        """test06GetAttCertWithUserIdSet: Request attribute certificate from
138        NDG Attribute Authority Web Service setting a specific user Id
139        independent of the signer of the SOAP request."""
140        thisSection = self.cfg['test06GetAttCertWithUserIdSet']
141       
142        # Make attribute certificate request
143        userId = thisSection['userId']
144        attCert = self.aa.getAttCert(userId=userId)
145       
146        print("Attribute Certificate: \n\n:" + str(attCert))
147       
148        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
149        attCert.write()
150
151
152    def test07GetMappedAttCert(self):       
153        """test07GetMappedAttCert: Request mapped attribute certificate from
154        NDG Attribute Authority Web Service."""
155        thisSection = self.cfg['test07GetMappedAttCert']
156       
157        # Read user Certificate into a string ready for passing via WS
158        try:
159            userX509CertFilePath = xpdVars(thisSection.get(
160                                                    'issuingClntCertFilePath'))
161            userX509CertTxt = open(userX509CertFilePath, 'r').read()
162       
163        except TypeError:
164            # No issuing cert set
165            userX509CertTxt = None
166               
167        except IOError, ioErr:
168            raise Exception("Error reading certificate file \"%s\": %s" % 
169                                    (ioErr.filename, ioErr.strerror))
170   
171        # Simlarly for Attribute Certificate
172        try:
173            userAttCert = AttCert.Read(
174                                xpdVars(thisSection['userAttCertFilePath']))
175           
176        except IOError, ioErr:
177            raise Exception("Error reading attribute certificate file \"%s\": "
178                            "%s" % (ioErr.filename, ioErr.strerror))
179       
180        # Make client to site B Attribute Authority
181        siteBAA = self._mkSiteBAttributeAuthority()
182   
183        # Make attribute certificate request
184        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
185                                     userAttCert=userAttCert)
186        print("Attribute Certificate: \n\n:" + str(attCert))
187       
188        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath'])
189        attCert.write()
190       
191       
192    def test08GetMappedAttCertStressTest(self):       
193        """test08GetMappedAttCertStressTest: Request mapped attribute
194        certificate from NDG Attribute Authority Web Service."""
195        thisSection = self.cfg['test08GetMappedAttCertStressTest']
196       
197        # Read user Certificate into a string ready for passing via WS
198        try:
199            userX509CertFilePath = xpdVars(thisSection.get(
200                                                    'issuingClntCertFilePath'))
201            userX509CertTxt = open(userX509CertFilePath, 'r').read()
202       
203        except TypeError:
204            # No issuing cert set
205            userX509CertTxt = None
206               
207        except IOError, ioErr:
208            raise Exception("Error reading certificate file \"%s\": %s" % 
209                                    (ioErr.filename, ioErr.strerror))
210
211        # Make client to site B Attribute Authority
212        siteBAA = self._mkSiteBAttributeAuthority()
213
214        acFilePathList = [xpdVars(file) for file in \
215                          thisSection['userAttCertFilePathList'].split()]
216
217        for acFilePath in acFilePathList:
218            try:
219                userAttCert = AttCert.Read(acFilePath)
220               
221            except IOError, ioErr:
222                raise Exception("Error reading attribute certificate file "
223                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
224       
225            # Make attribute certificate request
226            try:
227                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
228                                             userAttCert=userAttCert)
229            except Exception, e:
230                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \
231                        os.path.basename(acFilePath)   
232                msgFile = open(outFilePfx+".msg", 'w')
233                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
234                                       
235if __name__ == "__main__":
236    unittest.main()
Note: See TracBrowser for help on using the repository browser.