source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py @ 6039

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py@6039
Revision 6039, 6.5 KB checked in by pjkersha, 10 years ago (diff)

Unit tested SamlCredentialWallet?.

Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16
17from ndg.security.server.attributeauthority import (AttributeInterface, 
18                                                    InvalidRequestorId, 
19                                                    AttributeNotKnownError, 
20                                                    AttributeReleaseDenied, 
21                                                    UserIdNotKnown)
22from saml.common.xml import SAMLConstants
23from saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer,
24                             SAMLVersion, Subject, NameID, Conditions,
25                             XSStringAttributeValue)
26
27
28class TestUserRoles(AttributeInterface):
29    """Test User Roles class dynamic import for Attribute Authority"""
30    ATTRIBUTE_NAMES = (
31        "urn:siteA:security:authz:1.0:attr",
32    )
33   
34    ATTRIBUTE_VALUES = (
35        'urn:siteA:security:authz:1.0:attr:postdoc',
36        'urn:siteA:security:authz:1.0:attr:staff', 
37        'urn:siteA:security:authz:1.0:attr:undergrad', 
38        'urn:siteA:security:authz:1.0:attr:coapec',
39        'urn:siteA:security:authz:1.0:attr:rapid'
40    )
41
42    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
43        'urn:esg:email:address',
44        'urn:esg:first:name', 
45        'urn:esg:last:name'
46    )
47   
48    SAML_ATTRIBUTE_VALUES = (
49        ATTRIBUTE_VALUES,
50        ('p.kershaw@somewhere.ac.uk',),
51        ('Philip',),
52        ('Kershaw',)
53    )
54   
55    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
56        "emailAddress",
57        "FirstName",
58        "LastName"
59    )
60    SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\
61                            XSStringAttributeValue.TYPE_LOCAL_NAME,) * \
62                            len(SAML_ATTRIBUTE_NAMES)
63    SAML_ATTRIBUTES = []
64   
65    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
66                                               SAML_ATTRIBUTE_VALUES,
67                                               SAML_ATTRIBUTE_FORMATS,
68                                               SAML_ATTRIBUTE_FRIENDLY_NAMES):
69        SAML_ATTRIBUTES.append(Attribute())
70        SAML_ATTRIBUTES[-1].name = name
71        SAML_ATTRIBUTES[-1].nameFormat = format
72        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
73        for val in vals:
74            SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue())
75            SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
76
77    del name, val, vals, format, friendlyName
78   
79    # 8 hours validity for issued assertions
80    SAML_ASSERTION_LIFETIME = 8*60*60
81   
82    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",)
83    VALID_REQUESTOR_IDS = ("/O=Site A/CN=Authorisation Service", 
84                           "/O=Site B/CN=Authorisation Service")
85   
86    ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
87   
88    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = "/O=Site B/CN=Authorisation Service"
89   
90    def __init__(self, propertiesFilePath=None):
91        pass
92
93    def getRoles(self, userId):
94        return TestUserRoles.ATTRIBUTE_VALUES
95
96    def getAttributes(self, attributeQuery, response):
97        '''Test Attribute Authority SAML Attribute Query interface'''
98       
99        userId = attributeQuery.subject.nameID.value
100        requestedAttributeNames = [attribute.name
101                                   for attribute in attributeQuery.attributes]
102        requestorId = attributeQuery.issuer.value
103       
104        if userId not in TestUserRoles.VALID_USER_IDS:
105            raise UserIdNotKnown('Subject Id "%s" is not known to this '
106                                 'authority' % userId)
107           
108        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
109            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
110                                     requestorId)
111       
112        unknownAttrNames = [attrName for attrName in requestedAttributeNames
113                            if attrName not in 
114                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
115       
116        if len(unknownAttrNames) > 0:
117            raise AttributeNotKnownError("Unknown attributes requested: %r" %
118                                         unknownAttrNames)
119           
120        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
121            raise AttributeReleaseDenied("Attribute release denied for the "
122                                         'requestor "%s"' % requestorId)
123       
124        # Create a new assertion to hold the attributes to be returned
125        assertion = Assertion()
126       
127        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
128        assertion.id = str(uuid4())
129        assertion.issueInstant = response.issueInstant
130   
131        assertion.issuer = Issuer()
132        assertion.issuer.value = TestUserRoles.ISSUER_NAME
133        assertion.issuer.format = Issuer.X509_SUBJECT
134       
135        assertion.conditions = Conditions()
136        assertion.conditions.notBefore = assertion.issueInstant
137        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
138            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
139       
140        assertion.subject = Subject() 
141        assertion.subject.nameID = NameID()
142        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
143        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
144
145        attributeStatement = AttributeStatement()
146       
147        # Add test set of attributes
148        for name in requestedAttributeNames:
149            attributeFound = False
150            for attribute in TestUserRoles.SAML_ATTRIBUTES:
151                if attribute.name == name:
152                    attributeFound = True
153                    break
154           
155            if attributeFound:
156                attributeStatement.attributes.append(attribute)
157            else:
158                raise AttributeNotKnownError("Unknown attribute requested: %s"%
159                                             name)
160 
161        assertion.attributeStatements.append(attributeStatement)       
162        response.assertions.append(assertion)
163 
Note: See TracBrowser for help on using the repository browser.