source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py@7077
Revision 7077, 6.6 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16from ndg.saml.common.xml import SAMLConstants
17from ndg.saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer,
18                             SAMLVersion, Subject, NameID, Conditions,
19                             XSStringAttributeValue)
20
21from ndg.security.common.X509 import X500DN
22from ndg.security.server.attributeauthority import (AttributeInterface, 
23                                                    InvalidRequestorId, 
24                                                    AttributeNotKnownError, 
25                                                    AttributeReleaseDenied, 
26                                                    UserIdNotKnown)
27from ndg.security.test.unit import BaseTestCase
28
29
30class TestUserRoles(AttributeInterface):
31    """Test User Roles class dynamic import for Attribute Authority"""
32    ATTRIBUTE_NAMES = BaseTestCase.ATTRIBUTE_NAMES
33    ATTRIBUTE_VALUES = BaseTestCase.ATTRIBUTE_VALUES
34
35    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
36        'urn:esg:email:address',
37        'urn:esg:first:name', 
38        'urn:esg:last:name'
39    )
40   
41    SAML_ATTRIBUTE_VALUES = (
42        ATTRIBUTE_VALUES,
43        ('p.kershaw@somewhere.ac.uk',),
44        ('Philip',),
45        ('Kershaw',)
46    )
47   
48    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
49        "EmailAddress",
50        "FirstName",
51        "LastName"
52    )
53    SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\
54                              XSStringAttributeValue.TYPE_LOCAL_NAME,) * \
55                              len(SAML_ATTRIBUTE_NAMES)
56    SAML_ATTRIBUTES = []
57   
58    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
59                                               SAML_ATTRIBUTE_VALUES,
60                                               SAML_ATTRIBUTE_FORMATS,
61                                               SAML_ATTRIBUTE_FRIENDLY_NAMES):
62        SAML_ATTRIBUTES.append(Attribute())
63        SAML_ATTRIBUTES[-1].name = name
64        SAML_ATTRIBUTES[-1].nameFormat = format
65        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
66        for val in vals:
67            SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue())
68            SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
69
70    del name, val, vals, format, friendlyName
71   
72    # 8 hours validity for issued assertions
73    SAML_ASSERTION_LIFETIME = 8*60*60
74   
75    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",
76                      BaseTestCase.OPENID_URI)
77    VALID_REQUESTOR_IDS = BaseTestCase.VALID_REQUESTOR_IDS
78   
79    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = X500DN.fromString(
80                                        "/O=Site B/CN=Authorisation Service")
81   
82    def __init__(self, propertiesFilePath=None):
83        pass
84
85    def getRoles(self, userId):
86        return TestUserRoles.ATTRIBUTE_VALUES
87
88    def getAttributes(self, attributeQuery, response):
89        '''Test Attribute Authority SAML Attribute Query interface'''
90       
91        userId = attributeQuery.subject.nameID.value
92        requestedAttributeNames = [attribute.name
93                                   for attribute in attributeQuery.attributes]
94        if attributeQuery.issuer.format != Issuer.X509_SUBJECT:
95            raise InvalidRequestorId('Requestor issuer format "%s" is invalid' %
96                                     attributeQuery.issuerFormat.value)
97           
98        requestorId = X500DN.fromString(attributeQuery.issuer.value)
99       
100        if userId not in TestUserRoles.VALID_USER_IDS:
101            raise UserIdNotKnown('Subject Id "%s" is not known to this '
102                                 'authority' % userId)
103           
104        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
105            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
106                                     requestorId)
107       
108        unknownAttrNames = [attrName for attrName in requestedAttributeNames
109                            if attrName not in 
110                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
111       
112        if len(unknownAttrNames) > 0:
113            raise AttributeNotKnownError("Unknown attributes requested: %r" %
114                                         unknownAttrNames)
115           
116        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
117            raise AttributeReleaseDenied("Attribute release denied for the "
118                                         'requestor "%s"' % requestorId)
119       
120        # Create a new assertion to hold the attributes to be returned
121        assertion = Assertion()
122       
123        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
124        assertion.id = str(uuid4())
125        assertion.issueInstant = response.issueInstant
126   
127        assertion.issuer = Issuer()
128        assertion.issuer.value = response.issuer.value
129        assertion.issuer.format = Issuer.X509_SUBJECT
130       
131        assertion.conditions = Conditions()
132        assertion.conditions.notBefore = assertion.issueInstant
133        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
134            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
135       
136        assertion.subject = Subject() 
137        assertion.subject.nameID = NameID()
138        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
139        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
140
141        attributeStatement = AttributeStatement()
142       
143        # Add test set of attributes
144        for name in requestedAttributeNames:
145            attributeFound = False
146            for attribute in TestUserRoles.SAML_ATTRIBUTES:
147                if attribute.name == name:
148                    attributeFound = True
149                    break
150           
151            if attributeFound:
152                attributeStatement.attributes.append(attribute)
153            else:
154                raise AttributeNotKnownError("Unknown attribute requested: %s"%
155                                             name)
156 
157        assertion.attributeStatements.append(attributeStatement)       
158        response.assertions.append(assertion)
159 
Note: See TracBrowser for help on using the repository browser.