source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py @ 7698

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py@7698
Revision 7698, 7.3 KB checked in by pjkersha, 9 years ago (diff)

Integrated SAML ESGF Group/Role? attribute value type into SAML Attribute Authority client unit tests.

  • Property svn:keywords set to Id
Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16from ndg.saml.common.xml import SAMLConstants
17from ndg.saml.saml2.core import (Assertion, Attribute, AttributeStatement, 
18                                 Issuer, SAMLVersion, Subject, NameID, 
19                                 Conditions, XSStringAttributeValue)
20
21from ndg.security.common.saml_utils.esgf import (ESGFSamlNamespaces,
22                                                 ESGFGroupRoleAttributeValue)
23from ndg.security.common.X509 import X500DN
24from ndg.security.server.attributeauthority import (AttributeInterface, 
25                                                    InvalidRequestorId, 
26                                                    AttributeNotKnownError, 
27                                                    AttributeReleaseDenied, 
28                                                    UserIdNotKnown)
29from ndg.security.test.unit import BaseTestCase
30
31
32class TestUserRoles(AttributeInterface):
33    """Test User Roles class dynamic import for Attribute Authority"""
34    ATTRIBUTE_NAMES = BaseTestCase.ATTRIBUTE_NAMES
35    ATTRIBUTE_VALUES = BaseTestCase.ATTRIBUTE_VALUES
36
37    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
38        ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME,
39        ESGFSamlNamespaces.FIRSTNAME_ATTRNAME, 
40        ESGFSamlNamespaces.LASTNAME_ATTRNAME,
41        'urn:esg:sitea:grouprole'
42    )
43   
44    SAML_ATTRIBUTE_VALUES = (
45        ATTRIBUTE_VALUES,
46        ('p.kershaw@somewhere.ac.uk',),
47        ('Philip',),
48        ('Kershaw',),
49        (('siteagroup', 'default'),)
50    )
51   
52    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
53        "EmailAddress",
54        "FirstName",
55        "LastName",
56        "groupRole"
57    )
58    SAML_ATTRIBUTE_FORMATS = (
59        SAMLConstants.XSD_NS+"#"+XSStringAttributeValue.TYPE_LOCAL_NAME,) * (
60        len(SAML_ATTRIBUTE_NAMES)-1) + \
61        (ESGFGroupRoleAttributeValue.TYPE_LOCAL_NAME, )
62   
63    SAML_ATTRIBUTES = []
64   
65    name, val, vals, format, friendlyName = (None, None, None, None, None)
66    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
67                                                SAML_ATTRIBUTE_VALUES,
68                                                SAML_ATTRIBUTE_FORMATS,
69                                                SAML_ATTRIBUTE_FRIENDLY_NAMES):
70        SAML_ATTRIBUTES.append(Attribute())
71        SAML_ATTRIBUTES[-1].name = name
72        SAML_ATTRIBUTES[-1].nameFormat = format
73        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
74        for val in vals:
75            if isinstance(val, tuple):
76                SAML_ATTRIBUTES[-1].attributeValues.append(
77                                                ESGFGroupRoleAttributeValue())
78                SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
79            else:
80                SAML_ATTRIBUTES[-1].attributeValues.append(
81                                                XSStringAttributeValue())
82                SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
83
84    del name, val, vals, format, friendlyName
85   
86    # 8 hours validity for issued assertions
87    SAML_ASSERTION_LIFETIME = 8*60*60
88   
89    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",
90                      BaseTestCase.OPENID_URI)
91    VALID_REQUESTOR_IDS = BaseTestCase.VALID_REQUESTOR_IDS
92   
93    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = X500DN.fromString(
94                                        "/O=Site B/CN=Authorisation Service")
95   
96    def __init__(self, propertiesFilePath=None):
97        pass
98
99    def getRoles(self, userId):
100        return TestUserRoles.ATTRIBUTE_VALUES
101
102    def getAttributes(self, attributeQuery, response):
103        '''Test Attribute Authority SAML Attribute Query interface'''
104       
105        userId = attributeQuery.subject.nameID.value
106        requestedAttributeNames = [attribute.name
107                                   for attribute in attributeQuery.attributes]
108        if attributeQuery.issuer.format != Issuer.X509_SUBJECT:
109            raise InvalidRequestorId('Requestor issuer format "%s" is invalid' %
110                                     attributeQuery.issuerFormat.value)
111           
112        requestorId = X500DN.fromString(attributeQuery.issuer.value)
113       
114        if userId not in TestUserRoles.VALID_USER_IDS:
115            raise UserIdNotKnown('Subject Id "%s" is not known to this '
116                                 'authority' % userId)
117           
118        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
119            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
120                                     requestorId)
121       
122        unknownAttrNames = [attrName for attrName in requestedAttributeNames
123                            if attrName not in 
124                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
125       
126        if len(unknownAttrNames) > 0:
127            raise AttributeNotKnownError("Unknown attributes requested: %r" %
128                                         unknownAttrNames)
129           
130        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
131            raise AttributeReleaseDenied("Attribute release denied for the "
132                                         'requestor "%s"' % requestorId)
133       
134        # Create a new assertion to hold the attributes to be returned
135        assertion = Assertion()
136       
137        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
138        assertion.id = str(uuid4())
139        assertion.issueInstant = response.issueInstant
140   
141        assertion.issuer = Issuer()
142        assertion.issuer.value = response.issuer.value
143        assertion.issuer.format = Issuer.X509_SUBJECT
144       
145        assertion.conditions = Conditions()
146        assertion.conditions.notBefore = assertion.issueInstant
147        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
148            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
149       
150        assertion.subject = Subject() 
151        assertion.subject.nameID = NameID()
152        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
153        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
154
155        attributeStatement = AttributeStatement()
156       
157        # Add test set of attributes
158        for name in requestedAttributeNames:
159            attributeFound = False
160            for attribute in TestUserRoles.SAML_ATTRIBUTES:
161                if attribute.name == name:
162                    attributeFound = True
163                    break
164           
165            if attributeFound:
166                attributeStatement.attributes.append(attribute)
167            else:
168                raise AttributeNotKnownError("Unknown attribute requested: %s"%
169                                             name)
170 
171        assertion.attributeStatements.append(attributeStatement)       
172        response.assertions.append(assertion)
173 
Note: See TracBrowser for help on using the repository browser.