source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py @ 6052

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py@6052
Revision 6052, 6.7 KB checked in by pjkersha, 10 years ago (diff)

Updated MyProxy? Cert extension app for use with improved SAML Attribute Query interface class AttributeQuerySslSOAPBinding

Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16from saml.common.xml import SAMLConstants
17from saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer,
18                             SAMLVersion, Subject, NameID, Conditions,
19                             XSStringAttributeValue)
20
21from ndg.security.common.X509 import X500DN
22from ndg.security.server.attributeauthority import (AttributeInterface, 
23                                                    InvalidRequestorId, 
24                                                    AttributeNotKnownError, 
25                                                    AttributeReleaseDenied, 
26                                                    UserIdNotKnown)
27
28
29class TestUserRoles(AttributeInterface):
30    """Test User Roles class dynamic import for Attribute Authority"""
31    ATTRIBUTE_NAMES = (
32        "urn:siteA:security:authz:1.0:attr",
33    )
34   
35    ATTRIBUTE_VALUES = (
36        'urn:siteA:security:authz:1.0:attr:postdoc',
37        'urn:siteA:security:authz:1.0:attr:staff', 
38        'urn:siteA:security:authz:1.0:attr:undergrad', 
39        'urn:siteA:security:authz:1.0:attr:coapec',
40        'urn:siteA:security:authz:1.0:attr:rapid'
41    )
42
43    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
44        'urn:esg:email:address',
45        'urn:esg:first:name', 
46        'urn:esg:last:name'
47    )
48   
49    SAML_ATTRIBUTE_VALUES = (
50        ATTRIBUTE_VALUES,
51        ('p.kershaw@somewhere.ac.uk',),
52        ('Philip',),
53        ('Kershaw',)
54    )
55   
56    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
57        "emailAddress",
58        "FirstName",
59        "LastName"
60    )
61    SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\
62                            XSStringAttributeValue.TYPE_LOCAL_NAME,) * \
63                            len(SAML_ATTRIBUTE_NAMES)
64    SAML_ATTRIBUTES = []
65   
66    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
67                                               SAML_ATTRIBUTE_VALUES,
68                                               SAML_ATTRIBUTE_FORMATS,
69                                               SAML_ATTRIBUTE_FRIENDLY_NAMES):
70        SAML_ATTRIBUTES.append(Attribute())
71        SAML_ATTRIBUTES[-1].name = name
72        SAML_ATTRIBUTES[-1].nameFormat = format
73        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
74        for val in vals:
75            SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue())
76            SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
77
78    del name, val, vals, format, friendlyName
79   
80    # 8 hours validity for issued assertions
81    SAML_ASSERTION_LIFETIME = 8*60*60
82   
83    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",)
84    VALID_REQUESTOR_IDS = (
85        str(X500DN.fromString("/O=Site A/CN=Authorisation Service")), 
86        str(X500DN.fromString("/O=Site B/CN=Authorisation Service"))
87    )
88   
89    ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
90   
91    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = str(
92                    X500DN.fromString("/O=Site B/CN=Authorisation Service"))
93   
94    def __init__(self, propertiesFilePath=None):
95        pass
96
97    def getRoles(self, userId):
98        return TestUserRoles.ATTRIBUTE_VALUES
99
100    def getAttributes(self, attributeQuery, response):
101        '''Test Attribute Authority SAML Attribute Query interface'''
102       
103        userId = attributeQuery.subject.nameID.value
104        requestedAttributeNames = [attribute.name
105                                   for attribute in attributeQuery.attributes]
106        requestorId = attributeQuery.issuer.value
107       
108        if userId not in TestUserRoles.VALID_USER_IDS:
109            raise UserIdNotKnown('Subject Id "%s" is not known to this '
110                                 'authority' % userId)
111           
112        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
113            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
114                                     requestorId)
115       
116        unknownAttrNames = [attrName for attrName in requestedAttributeNames
117                            if attrName not in 
118                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
119       
120        if len(unknownAttrNames) > 0:
121            raise AttributeNotKnownError("Unknown attributes requested: %r" %
122                                         unknownAttrNames)
123           
124        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
125            raise AttributeReleaseDenied("Attribute release denied for the "
126                                         'requestor "%s"' % requestorId)
127       
128        # Create a new assertion to hold the attributes to be returned
129        assertion = Assertion()
130       
131        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
132        assertion.id = str(uuid4())
133        assertion.issueInstant = response.issueInstant
134   
135        assertion.issuer = Issuer()
136        assertion.issuer.value = TestUserRoles.ISSUER_NAME
137        assertion.issuer.format = Issuer.X509_SUBJECT
138       
139        assertion.conditions = Conditions()
140        assertion.conditions.notBefore = assertion.issueInstant
141        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
142            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
143       
144        assertion.subject = Subject() 
145        assertion.subject.nameID = NameID()
146        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
147        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
148
149        attributeStatement = AttributeStatement()
150       
151        # Add test set of attributes
152        for name in requestedAttributeNames:
153            attributeFound = False
154            for attribute in TestUserRoles.SAML_ATTRIBUTES:
155                if attribute.name == name:
156                    attributeFound = True
157                    break
158           
159            if attributeFound:
160                attributeStatement.attributes.append(attribute)
161            else:
162                raise AttributeNotKnownError("Unknown attribute requested: %s"%
163                                             name)
164 
165        assertion.attributeStatements.append(attributeStatement)       
166        response.assertions.append(assertion)
167 
Note: See TracBrowser for help on using the repository browser.