source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 6615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@6615
Revision 6615, 24.4 KB checked in by pjkersha, 10 years ago (diff)

AuthzService? unit test wiht ndg.security.server.wsgi.authzservice.AuthzServiceMiddleware? near complete. Fixes required to PIP callout to Attribute Authority.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from os.path import expandvars as xpdVars
22from os.path import join as jnPath
23mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file)
24
25from ndg.security.test.unit import BaseTestCase
26
27from ndg.security.common.utils.configfileparsers import (
28    CaseSensitiveConfigParser)
29from ndg.security.server.attributeauthority import (AttributeAuthority, 
30    AttributeAuthorityNoMatchingRoleInTrustedHosts, 
31    SQLAlchemyAttributeInterface, InvalidAttributeFormat)
32
33from ndg.security.common.AttCert import AttCert
34
35
36class AttributeAuthorityTestCase(BaseTestCase):
37    clntPriKeyPwd = None
38
39    def setUp(self):
40        super(AttributeAuthorityTestCase, self).setUp()
41       
42        if 'NDGSEC_INT_DEBUG' in os.environ:
43            import pdb
44            pdb.set_trace()
45       
46        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
47            os.environ['NDGSEC_AA_UNITTEST_DIR'] = \
48                os.path.abspath(os.path.dirname(__file__))
49
50        self.cfgParser = CaseSensitiveConfigParser()
51        cfgFilePath = mkPath('test_attributeauthority.cfg')
52        self.cfgParser.read(cfgFilePath)
53       
54        self.cfg = {}
55        for section in self.cfgParser.sections() + ['DEFAULT']:
56            self.cfg[section] = dict(self.cfgParser.items(section))
57           
58        self.aa = AttributeAuthority.fromPropertyFile(
59                                            self.cfg['setUp']['propFilePath'])
60
61    _mkSiteBAttributeAuthority = lambda self: \
62        AttributeAuthority.fromPropertyFile(
63                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath'])
64   
65    def test01GetHostInfo(self):
66        """test01GetHostInfo: retrieve info for AA host"""
67        hostInfo = self.aa.hostInfo
68        print("Host Info:\n %s" % hostInfo)     
69
70    def test02GetTrustedHostInfo(self):
71        """test02GetTrustedHostInfo: retrieve trusted host info matching a
72        given role"""
73        thisSection = self.cfg['test02GetTrustedHostInfo']
74       
75        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
76        for hostname, hostInfo in trustedHostInfo.items():
77            self.assert_(hostname, "Hostname not set")
78            for k, v in hostInfo.items():
79                self.assert_(k, "hostInfo value key unset")
80
81        print("Trusted Host Info:\n %s" % trustedHostInfo)
82
83    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
84        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
85        where the input role doesn't match any roles in the target AA's map
86        config file"""
87        thisSection = self.cfg[
88                            'test03GetTrustedHostInfoWithNoMatchingRoleFound']
89        try:
90            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role'])
91            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
92           
93        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e:
94            print('PASSED - no match for role "%s": %s' % (thisSection['role'],
95                                                           e))
96
97
98    def test04GetTrustedHostInfoWithNoRole(self):
99        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
100        irrespective of role"""
101        trustedHostInfo = self.aa.getTrustedHostInfo()
102        for hostname, hostInfo in trustedHostInfo.items():
103            self.assert_(hostname, "Hostname not set")
104            for k, v in hostInfo.items():
105                self.assert_(k, "hostInfo value key unset")
106                   
107        print("Trusted Host Info:\n %s" % trustedHostInfo)
108
109    def test05GetAttCert(self):       
110        """test05GetAttCert: Request attribute certificate from NDG Attribute
111        Authority Web Service."""
112        thisSection = self.cfg['test05GetAttCert']
113       
114        # Read user Certificate into a string ready for passing via WS
115        try:
116            userX509CertFilePath = xpdVars(thisSection.get(
117                                                    'issuingClntCertFilePath'))
118            userX509CertTxt = open(userX509CertFilePath, 'r').read()
119       
120        except TypeError:
121            # No issuing cert set
122            userX509CertTxt = None
123               
124        except IOError, ioErr:
125            raise Exception("Error reading certificate file \"%s\": %s" %
126                                    (ioErr.filename, ioErr.strerror))
127
128        # Make attribute certificate request
129        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt)
130       
131        print("Attribute Certificate: \n\n:" + str(attCert))
132       
133        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
134        attCert.write()
135       
136       
137    def test06GetAttCertWithUserIdSet(self):       
138        """test06GetAttCertWithUserIdSet: Request attribute certificate from
139        NDG Attribute Authority Web Service setting a specific user Id
140        independent of the signer of the SOAP request."""
141        thisSection = self.cfg['test06GetAttCertWithUserIdSet']
142       
143        # Make attribute certificate request
144        userId = thisSection['userId']
145        attCert = self.aa.getAttCert(userId=userId)
146       
147        print("Attribute Certificate: \n\n:" + str(attCert))
148       
149        attCert.filePath = xpdVars(thisSection['attCertFilePath'])
150        attCert.write()
151
152
153    def test07GetMappedAttCert(self):       
154        """test07GetMappedAttCert: Request mapped attribute certificate from
155        NDG Attribute Authority Web Service."""
156        thisSection = self.cfg['test07GetMappedAttCert']
157       
158        # Read user Certificate into a string ready for passing via WS
159        try:
160            userX509CertFilePath = xpdVars(thisSection.get(
161                                                    'issuingClntCertFilePath'))
162            userX509CertTxt = open(userX509CertFilePath, 'r').read()
163       
164        except TypeError:
165            # No issuing cert set
166            userX509CertTxt = None
167               
168        except IOError, ioErr:
169            raise Exception("Error reading certificate file \"%s\": %s" % 
170                                    (ioErr.filename, ioErr.strerror))
171   
172        # Simlarly for Attribute Certificate
173        try:
174            userAttCert = AttCert.Read(
175                                xpdVars(thisSection['userAttCertFilePath']))
176           
177        except IOError, ioErr:
178            raise Exception("Error reading attribute certificate file \"%s\": "
179                            "%s" % (ioErr.filename, ioErr.strerror))
180       
181        # Make client to site B Attribute Authority
182        siteBAA = self._mkSiteBAttributeAuthority()
183   
184        # Make attribute certificate request
185        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
186                                     userAttCert=userAttCert)
187        print("Attribute Certificate: \n\n:" + str(attCert))
188       
189        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath'])
190        attCert.write()
191       
192       
193    def test08GetMappedAttCertStressTest(self):       
194        """test08GetMappedAttCertStressTest: Request mapped attribute
195        certificate from NDG Attribute Authority Web Service."""
196        thisSection = self.cfg['test08GetMappedAttCertStressTest']
197       
198        # Read user Certificate into a string ready for passing via WS
199        try:
200            userX509CertFilePath = xpdVars(thisSection.get(
201                                                    'issuingClntCertFilePath'))
202            userX509CertTxt = open(userX509CertFilePath, 'r').read()
203       
204        except TypeError:
205            # No issuing cert set
206            userX509CertTxt = None
207               
208        except IOError, ioErr:
209            raise Exception("Error reading certificate file \"%s\": %s" % 
210                                    (ioErr.filename, ioErr.strerror))
211
212        # Make client to site B Attribute Authority
213        siteBAA = self._mkSiteBAttributeAuthority()
214
215        acFilePathList = [xpdVars(file) for file in \
216                          thisSection['userAttCertFilePathList'].split()]
217
218        passed = True
219        for acFilePath in acFilePathList:
220            try:
221                userAttCert = AttCert.Read(acFilePath)
222               
223            except IOError, ioErr:
224                raise Exception("Error reading attribute certificate file "
225                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
226       
227            # Make attribute certificate request
228            try:
229                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt,
230                                             userAttCert=userAttCert)
231            except Exception, e:
232                passed = True
233                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \
234                        os.path.basename(acFilePath)   
235                msgFile = open(outFilePfx+".msg", 'w')
236                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
237               
238        self.assert_(passed, 
239                     "At least one Attribute Certificate request failed.  "
240                     "Check the .msg files in this directory")
241
242
243from warnings import warn
244from uuid import uuid4
245from datetime import datetime
246from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, NameID,
247                             Issuer, AttributeQuery, XSStringAttributeValue, 
248                             Status, StatusMessage, StatusCode)
249from ndg.saml.xml import XMLConstants
250from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
251
252
253class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
254    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
255                             "'${userId}'")
256   
257    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
258                               "'${userId}'")
259           
260    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
261                              "'${userId}'")
262       
263    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
264                                  "openid = '${userId}'")
265       
266    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
267                                "where users.openid = '${userId}' and "
268                                "attributes.username = users.username")
269                               
270    def __init__(self, *arg, **kw):
271        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
272        self.skipTests = False
273        try:
274            import sqlalchemy
275
276        except NotImplementedError:
277            # Don't proceed with tests because SQLAlchemy is not installed
278            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
279                 "SQLAlchemy is not installed")
280            self.skipTests = True
281       
282        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
283            os.environ['NDGSEC_AA_UNITTEST_DIR'
284                       ] = os.path.abspath(os.path.dirname(__file__))
285           
286        self.initDb()
287       
288    def test01TrySamlAttribute2SqlQuery__setattr__(self):
289        if self.skipTests:
290            return
291       
292        attributeInterface = SQLAlchemyAttributeInterface()
293       
294        # Define queries for SAML attribute names
295        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
296            EsgSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
297            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
298           
299        setattr(attributeInterface, 
300                'samlAttribute2SqlQuery.lastName',
301                "%s %s" % (EsgSamlNamespaces.LASTNAME_ATTRNAME,
302                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
303       
304        attributeInterface.samlAttribute2SqlQuery[
305            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
306                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
307       
308        attributeInterface.samlAttribute2SqlQuery[
309            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
310            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
311       
312    def test02SetProperties(self):
313        # test setProperties interface for instance attribute assignment
314        if self.skipTests:
315            return
316       
317        # samlAttribute2SqlQuery* suffixes have no particular requirement
318        # only that they are unique and start with an underscore or period.
319        properties = {
320            'connectionString': 
321                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
322           
323            'samlSubjectSqlQuery':
324                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
325               
326            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
327                EsgSamlNamespaces.FIRSTNAME_ATTRNAME,
328                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
329           
330            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
331                EsgSamlNamespaces.LASTNAME_ATTRNAME,
332                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
333       
334            'samlAttribute2SqlQuery.3': '%s "%s"' % (
335            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME,
336            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
337       
338            'samlAttribute2SqlQuery_0': '%s %s' % (
339                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
340                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
341           
342            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
343                                      '/O=ESG/OU=NCAR/CN=Gateway'),
344            'samlAssertionLifetime': 86400,
345
346        }
347        attributeInterface = SQLAlchemyAttributeInterface()
348        attributeInterface.setProperties(**properties)
349       
350        self.assert_(
351            attributeInterface.samlAttribute2SqlQuery[
352                EsgSamlNamespaces.FIRSTNAME_ATTRNAME] == \
353            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
354       
355        self.assert_(attributeInterface.connectionString == \
356                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
357       
358        # Test constructor setting properties
359        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
360        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
361
362    def test03FromConfigFile(self):
363        if self.skipTests:
364            return
365        cfgParser = CaseSensitiveConfigParser()
366        cfgFilePath = mkPath('test_sqlalchemyattributeinterface.cfg')
367        cfgParser.read(cfgFilePath)
368       
369        cfg = dict(cfgParser.items('DEFAULT'))
370        attributeInterface = SQLAlchemyAttributeInterface()
371        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
372       
373        self.assert_(
374            attributeInterface.samlAttribute2SqlQuery[
375                EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
376            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
377
378    def test04SamlAttributeQuery(self):
379        if self.skipTests:
380            return
381       
382        # Prepare a client query
383        attributeQuery = AttributeQuery()
384        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
385        attributeQuery.id = str(uuid4())
386        attributeQuery.issueInstant = datetime.utcnow()
387       
388        attributeQuery.issuer = Issuer()
389        attributeQuery.issuer.format = Issuer.X509_SUBJECT
390        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
391                       
392                       
393        attributeQuery.subject = Subject() 
394        attributeQuery.subject.nameID = NameID()
395        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
396        attributeQuery.subject.nameID.value = \
397                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
398       
399        fnAttribute = Attribute()
400        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
401        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
402        fnAttribute.friendlyName = "FirstName"
403
404        attributeQuery.attributes.append(fnAttribute)
405   
406        lnAttribute = Attribute()
407        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
408        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
409        lnAttribute.friendlyName = "LastName"
410
411        attributeQuery.attributes.append(lnAttribute)
412   
413        emailAddressAttribute = Attribute()
414        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
415        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
416        emailAddressAttribute.friendlyName = "EmailAddress"
417
418        attributeQuery.attributes.append(emailAddressAttribute)                                   
419   
420        authzAttribute = Attribute()
421        authzAttribute.name = \
422            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
423        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
424        authzAttribute.friendlyName = "authz"
425
426        attributeQuery.attributes.append(authzAttribute)                                   
427       
428        # Add the response - the interface will populate with an assertion as
429        # appropriate
430        samlResponse = Response()
431       
432        samlResponse.issueInstant = datetime.utcnow()
433        samlResponse.id = str(uuid4())
434        samlResponse.issuer = Issuer()
435       
436        # Initialise to success status but reset on error
437        samlResponse.status = Status()
438        samlResponse.status.statusCode = StatusCode()
439        samlResponse.status.statusMessage = StatusMessage()
440        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
441       
442        # Nb. SAML 2.0 spec says issuer format must be omitted
443        samlResponse.issuer.value = "CEDA"
444       
445        samlResponse.inResponseTo = attributeQuery.id
446       
447        # Set up the interface object
448       
449        # Define queries for SAML attribute names
450        samlAttribute2SqlQuery = {
451            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
452                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
453           
454            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
455                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
456       
457            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
458                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
459       
460            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
461                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
462        }
463       
464        attributeInterface = SQLAlchemyAttributeInterface(
465                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
466       
467        attributeInterface.connectionString = \
468                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
469               
470        attributeInterface.samlValidRequestorDNs = (
471            '/O=STFC/OU=CEDA/CN=AuthorisationService',
472            '/O=ESG/OU=NCAR/CN=Gateway')
473       
474        attributeInterface.setProperties(samlAssertionLifetime=28800.,
475                                issuerName='/CN=Attribute Authority/O=Site A')
476       
477        attributeInterface.samlSubjectSqlQuery = (
478            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
479       
480        # Make the query
481        attributeInterface.getAttributes(attributeQuery, samlResponse)
482       
483        self.assert_(
484                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
485        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
486        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
487                     attributeQuery.subject.nameID.value)
488        self.assert_(
489            samlResponse.assertions[0].attributeStatements[0].attributes[1
490                ].attributeValues[0].value == 'Kershaw')
491       
492        self.assert_(
493            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
494                ].attributeValues) == \
495                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
496
497    def test04SamlAttributeQuery(self):
498        if self.skipTests:
499            return
500       
501        # Prepare a client query
502        attributeQuery = AttributeQuery()
503        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
504        attributeQuery.id = str(uuid4())
505        attributeQuery.issueInstant = datetime.utcnow()
506       
507        attributeQuery.issuer = Issuer()
508        attributeQuery.issuer.format = Issuer.X509_SUBJECT
509        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
510                       
511                       
512        attributeQuery.subject = Subject() 
513        attributeQuery.subject.nameID = NameID()
514        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
515        attributeQuery.subject.nameID.value = \
516                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
517   
518        emailAddressAttribute = Attribute()
519        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
520        emailAddressAttribute.nameFormat = "InvalidFormat"
521        emailAddressAttribute.friendlyName = "EmailAddress"
522
523        attributeQuery.attributes.append(emailAddressAttribute)                                   
524   
525        authzAttribute = Attribute()
526        authzAttribute.name = \
527            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
528        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
529        authzAttribute.friendlyName = "authz"
530
531        attributeQuery.attributes.append(authzAttribute)                                   
532       
533        # Add the response - the interface will populate with an assertion as
534        # appropriate
535        samlResponse = Response()
536       
537        samlResponse.issueInstant = datetime.utcnow()
538        samlResponse.id = str(uuid4())
539        samlResponse.issuer = Issuer()
540       
541        # Initialise to success status but reset on error
542        samlResponse.status = Status()
543        samlResponse.status.statusCode = StatusCode()
544        samlResponse.status.statusMessage = StatusMessage()
545        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
546       
547        # Nb. SAML 2.0 spec says issuer format must be omitted
548        samlResponse.issuer.value = "CEDA"
549       
550        samlResponse.inResponseTo = attributeQuery.id
551       
552        # Set up the interface object
553       
554        # Define queries for SAML attribute names
555        samlAttribute2SqlQuery = {
556            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
557                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
558           
559            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
560                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
561       
562            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
563                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
564       
565            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
566                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
567        }
568       
569        attributeInterface = SQLAlchemyAttributeInterface(
570                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
571       
572        attributeInterface.connectionString = \
573                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
574               
575        attributeInterface.samlValidRequestorDNs = (
576            '/O=STFC/OU=CEDA/CN=AuthorisationService',
577            '/O=ESG/OU=NCAR/CN=Gateway')
578       
579        attributeInterface.setProperties(samlAssertionLifetime=28800.,
580                                issuerName='/CN=Attribute Authority/O=Site A')
581       
582        attributeInterface.samlSubjectSqlQuery = (
583            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
584       
585        # Make the query
586        try:
587            attributeInterface.getAttributes(attributeQuery, samlResponse)
588        except InvalidAttributeFormat:
589            print("PASSED: caught InvalidAttributeFormat exception")
590        else:
591            self.fail("Expecting InvalidAttributeFormat exception")
592       
593if __name__ == "__main__":
594    unittest.main()
Note: See TracBrowser for help on using the repository browser.