source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py @ 6062

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py@6062
Revision 6062, 6.7 KB checked in by pjkersha, 10 years ago (diff)
  • Working SamlPIPMiddleware - the Policy Information Point with an interface to the SAML Attribute Authority. The PIP retrieves user credential information for the PDP.
  • Fixed ndg.security.test.unit.wsgi.authz.test_authz unit tests for the above.
Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16from saml.common.xml import SAMLConstants
17from saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer,
18                             SAMLVersion, Subject, NameID, Conditions,
19                             XSStringAttributeValue)
20
21from ndg.security.common.X509 import X500DN
22from ndg.security.server.attributeauthority import (AttributeInterface, 
23                                                    InvalidRequestorId, 
24                                                    AttributeNotKnownError, 
25                                                    AttributeReleaseDenied, 
26                                                    UserIdNotKnown)
27
28
29class TestUserRoles(AttributeInterface):
30    """Test User Roles class dynamic import for Attribute Authority"""
31    ATTRIBUTE_NAMES = (
32        "urn:siteA:security:authz:1.0:attr",
33    )
34   
35    ATTRIBUTE_VALUES = (
36        'urn:siteA:security:authz:1.0:attr:postdoc',
37        'urn:siteA:security:authz:1.0:attr:staff', 
38        'urn:siteA:security:authz:1.0:attr:undergrad', 
39        'urn:siteA:security:authz:1.0:attr:coapec',
40        'urn:siteA:security:authz:1.0:attr:rapid'
41    )
42
43    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
44        'urn:esg:email:address',
45        'urn:esg:first:name', 
46        'urn:esg:last:name'
47    )
48   
49    SAML_ATTRIBUTE_VALUES = (
50        ATTRIBUTE_VALUES,
51        ('p.kershaw@somewhere.ac.uk',),
52        ('Philip',),
53        ('Kershaw',)
54    )
55   
56    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
57        "emailAddress",
58        "FirstName",
59        "LastName"
60    )
61    SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\
62                            XSStringAttributeValue.TYPE_LOCAL_NAME,) * \
63                            len(SAML_ATTRIBUTE_NAMES)
64    SAML_ATTRIBUTES = []
65   
66    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
67                                               SAML_ATTRIBUTE_VALUES,
68                                               SAML_ATTRIBUTE_FORMATS,
69                                               SAML_ATTRIBUTE_FRIENDLY_NAMES):
70        SAML_ATTRIBUTES.append(Attribute())
71        SAML_ATTRIBUTES[-1].name = name
72        SAML_ATTRIBUTES[-1].nameFormat = format
73        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
74        for val in vals:
75            SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue())
76            SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
77
78    del name, val, vals, format, friendlyName
79   
80    # 8 hours validity for issued assertions
81    SAML_ASSERTION_LIFETIME = 8*60*60
82   
83    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",)
84    VALID_REQUESTOR_IDS = (
85        str(X500DN.fromString("/O=Site A/CN=Authorisation Service")), 
86        str(X500DN.fromString("/O=Site B/CN=Authorisation Service")),
87        str(X500DN.fromString('/CN=test/O=NDG/OU=BADC'))
88    )
89   
90    ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
91   
92    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = str(
93                    X500DN.fromString("/O=Site B/CN=Authorisation Service"))
94   
95    def __init__(self, propertiesFilePath=None):
96        pass
97
98    def getRoles(self, userId):
99        return TestUserRoles.ATTRIBUTE_VALUES
100
101    def getAttributes(self, attributeQuery, response):
102        '''Test Attribute Authority SAML Attribute Query interface'''
103       
104        userId = attributeQuery.subject.nameID.value
105        requestedAttributeNames = [attribute.name
106                                   for attribute in attributeQuery.attributes]
107        requestorId = attributeQuery.issuer.value
108       
109        if userId not in TestUserRoles.VALID_USER_IDS:
110            raise UserIdNotKnown('Subject Id "%s" is not known to this '
111                                 'authority' % userId)
112           
113        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
114            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
115                                     requestorId)
116       
117        unknownAttrNames = [attrName for attrName in requestedAttributeNames
118                            if attrName not in 
119                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
120       
121        if len(unknownAttrNames) > 0:
122            raise AttributeNotKnownError("Unknown attributes requested: %r" %
123                                         unknownAttrNames)
124           
125        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
126            raise AttributeReleaseDenied("Attribute release denied for the "
127                                         'requestor "%s"' % requestorId)
128       
129        # Create a new assertion to hold the attributes to be returned
130        assertion = Assertion()
131       
132        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
133        assertion.id = str(uuid4())
134        assertion.issueInstant = response.issueInstant
135   
136        assertion.issuer = Issuer()
137        assertion.issuer.value = TestUserRoles.ISSUER_NAME
138        assertion.issuer.format = Issuer.X509_SUBJECT
139       
140        assertion.conditions = Conditions()
141        assertion.conditions.notBefore = assertion.issueInstant
142        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
143            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
144       
145        assertion.subject = Subject() 
146        assertion.subject.nameID = NameID()
147        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
148        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
149
150        attributeStatement = AttributeStatement()
151       
152        # Add test set of attributes
153        for name in requestedAttributeNames:
154            attributeFound = False
155            for attribute in TestUserRoles.SAML_ATTRIBUTES:
156                if attribute.name == name:
157                    attributeFound = True
158                    break
159           
160            if attributeFound:
161                attributeStatement.attributes.append(attribute)
162            else:
163                raise AttributeNotKnownError("Unknown attribute requested: %s"%
164                                             name)
165 
166        assertion.attributeStatements.append(attributeStatement)       
167        response.assertions.append(assertion)
168 
Note: See TracBrowser for help on using the repository browser.