source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 5973

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@5973
Revision 5973, 10.4 KB checked in by pjkersha, 10 years ago (diff)

SQLAlchemyAttributeInterface class for Attribute Authority: added special attribute for mapping SAML attribute names to their respective SQL queries.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from os.path import expandvars as xpdVars
22from os.path import join as jnPath
23mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file)
24
25from ndg.security.test.unit import BaseTestCase
26
27from ndg.security.common.utils.configfileparsers import (
28    CaseSensitiveConfigParser)
29from ndg.security.server.attributeauthority import (AttributeAuthority, 
30    AttributeAuthorityNoMatchingRoleInTrustedHosts, 
31    SQLAlchemyAttributeInterface)
32
33from ndg.security.common.AttCert import AttCert
34
35
36class AttributeAuthorityTestCase(BaseTestCase):
37    clntPriKeyPwd = None
38
39    def setUp(self):
40        super(AttributeAuthorityTestCase, self).setUp()
41       
42        if 'NDGSEC_INT_DEBUG' in os.environ:
43            import pdb
44            pdb.set_trace()
45       
46        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
47            os.environ['NDGSEC_AA_UNITTEST_DIR'] = \
48                os.path.abspath(os.path.dirname(__file__))
49
50        self.cfgParser = CaseSensitiveConfigParser()
51        cfgFilePath = mkPath('test_attributeauthority.cfg')
52        self.cfgParser.read(cfgFilePath)
53       
54        self.cfg = {}
55        for section in self.cfgParser.sections() + ['DEFAULT']:
56            self.cfg[section] = dict(self.cfgParser.items(section))
57           
58        self.aa = AttributeAuthority.fromPropertyFile(
59                                            self.cfg['setUp']['propFilePath'])
60
61    _mkSiteBAttributeAuthority = lambda self: \
62        AttributeAuthority.fromPropertyFile(
63                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath'])
64   
65    def test01GetHostInfo(self):
66        """test01GetHostInfo: retrieve info for AA host"""
67        hostInfo = self.aa.hostInfo
68        print("Host Info:\n %s" % hostInfo)     
69
70    def test02GetTrustedHostInfo(self):
71        """test02GetTrustedHostInfo: retrieve trusted host info matching a
72        given role"""
73        thisSection = self.cfg['test02GetTrustedHostInfo']
74       
75        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
76        for hostname, hostInfo in trustedHostInfo.items():
77            self.assert_(hostname, "Hostname not set")
78            for k, v in hostInfo.items():
79                self.assert_(k, "hostInfo value key unset")
80
81        print("Trusted Host Info:\n %s" % trustedHostInfo)
82
83    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
84        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
85        where the input role doesn't match any roles in the target AA's map
86        config file"""
87        thisSection = self.cfg[
88                            'test03GetTrustedHostInfoWithNoMatchingRoleFound']
89        try:
90            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
91            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
92           
93        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e:
94            print('PASSED - no match for role "%s": %s' % (thisSection['role'],
95                                                           e))
96
97
98    def test04GetTrustedHostInfoWithNoRole(self):
99        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
100        irrespective of role"""
101        trustedHostInfo = self.aa.getTrustedHostInfo()
102        for hostname, hostInfo in trustedHostInfo.items():
103            self.assert_(hostname, "Hostname not set")
104            for k, v in hostInfo.items():
105                self.assert_(k, "hostInfo value key unset")
106                   
107        print("Trusted Host Info:\n %s" % trustedHostInfo)
108
109    def test05GetAttCert(self):       
110        """test05GetAttCert: Request attribute certificate from NDG Attribute
111        Authority Web Service."""
112        thisSection = self.cfg['test05GetAttCert']
113       
114        # Read user Certificate into a string ready for passing via WS
115        try:
116            userX509CertFilePath = xpdVars(thisSection.get(
117                                                    'issuingClntCertFilePath'))
118            userX509CertTxt = open(userX509CertFilePath, 'r').read()
119       
120        except TypeError:
121            # No issuing cert set
122            userX509CertTxt = None
123               
124        except IOError, ioErr:
125            raise Exception("Error reading certificate file \"%s\": %s" %
126                                    (ioErr.filename, ioErr.strerror))
127
128        # Make attribute certificate request
129        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt)
130       
131        print("Attribute Certificate: \n\n:" + str(attCert))
132       
133        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
134        attCert.write()
135       
136       
137    def test06GetAttCertWithUserIdSet(self):       
138        """test06GetAttCertWithUserIdSet: Request attribute certificate from
139        NDG Attribute Authority Web Service setting a specific user Id
140        independent of the signer of the SOAP request."""
141        thisSection = self.cfg['test06GetAttCertWithUserIdSet']
142       
143        # Make attribute certificate request
144        userId = thisSection['userId']
145        attCert = self.aa.getAttCert(userId=userId)
146       
147        print("Attribute Certificate: \n\n:" + str(attCert))
148       
149        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
150        attCert.write()
151
152
153    def test07GetMappedAttCert(self):       
154        """test07GetMappedAttCert: Request mapped attribute certificate from
155        NDG Attribute Authority Web Service."""
156        thisSection = self.cfg['test07GetMappedAttCert']
157       
158        # Read user Certificate into a string ready for passing via WS
159        try:
160            userX509CertFilePath = xpdVars(thisSection.get(
161                                                    'issuingClntCertFilePath'))
162            userX509CertTxt = open(userX509CertFilePath, 'r').read()
163       
164        except TypeError:
165            # No issuing cert set
166            userX509CertTxt = None
167               
168        except IOError, ioErr:
169            raise Exception("Error reading certificate file \"%s\": %s" % 
170                                    (ioErr.filename, ioErr.strerror))
171   
172        # Simlarly for Attribute Certificate
173        try:
174            userAttCert = AttCert.Read(
175                                xpdVars(thisSection['userAttCertFilePath']))
176           
177        except IOError, ioErr:
178            raise Exception("Error reading attribute certificate file \"%s\": "
179                            "%s" % (ioErr.filename, ioErr.strerror))
180       
181        # Make client to site B Attribute Authority
182        siteBAA = self._mkSiteBAttributeAuthority()
183   
184        # Make attribute certificate request
185        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
186                                     userAttCert=userAttCert)
187        print("Attribute Certificate: \n\n:" + str(attCert))
188       
189        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath'])
190        attCert.write()
191       
192       
193    def test08GetMappedAttCertStressTest(self):       
194        """test08GetMappedAttCertStressTest: Request mapped attribute
195        certificate from NDG Attribute Authority Web Service."""
196        thisSection = self.cfg['test08GetMappedAttCertStressTest']
197       
198        # Read user Certificate into a string ready for passing via WS
199        try:
200            userX509CertFilePath = xpdVars(thisSection.get(
201                                                    'issuingClntCertFilePath'))
202            userX509CertTxt = open(userX509CertFilePath, 'r').read()
203       
204        except TypeError:
205            # No issuing cert set
206            userX509CertTxt = None
207               
208        except IOError, ioErr:
209            raise Exception("Error reading certificate file \"%s\": %s" % 
210                                    (ioErr.filename, ioErr.strerror))
211
212        # Make client to site B Attribute Authority
213        siteBAA = self._mkSiteBAttributeAuthority()
214
215        acFilePathList = [xpdVars(file) for file in \
216                          thisSection['userAttCertFilePathList'].split()]
217
218        passed = True
219        for acFilePath in acFilePathList:
220            try:
221                userAttCert = AttCert.Read(acFilePath)
222               
223            except IOError, ioErr:
224                raise Exception("Error reading attribute certificate file "
225                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
226       
227            # Make attribute certificate request
228            try:
229                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
230                                             userAttCert=userAttCert)
231            except Exception, e:
232                passed = True
233                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \
234                        os.path.basename(acFilePath)   
235                msgFile = open(outFilePfx+".msg", 'w')
236                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
237               
238        self.assert_(passed, 
239                     "At least one Attribute Certificate request failed.  "
240                     "Check the .msg files in this directory")
241
242
243from warnings import warn
244   
245class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
246    def __init__(self, *arg, **kw):
247        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
248        self.skipTests = False
249        try:
250            self._createDb()
251
252        except NotImplementedError:
253            # Don't proceed with tests because SQLAlchemy is not installed
254            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
255                 "SQLAlchemy is not installed")
256            self.skipTests = True
257       
258    def test01(self):
259        if self.skipTests:
260            return
261       
262        attributeInterface = SQLAlchemyAttributeInterface()
263        attributeInterface.samlAttribute2SqlQueryMap_firstName = 'Philip'
264        setattr(attributeInterface, 'samlAttribute2SqlQueryMap.lastName',
265                'Kershaw')
266        attributeInterface.samlAttribute2SqlQueryMap['emailAddress'] = ('pjk'
267                                                            '@somewhere.ac.uk')
268
269                               
270if __name__ == "__main__":
271    unittest.main()
Note: See TracBrowser for help on using the repository browser.