source: TI12-security/trunk/python/ndg_security_server/ndg/security/server/myproxy/certificate_extapp/saml_attribute_assertion.py @ 5924

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_server/ndg/security/server/myproxy/certificate_extapp/saml_attribute_assertion.py@5924
Revision 5924, 7.7 KB checked in by pjkersha, 10 years ago (diff)

Initial unit tests for MyProxy? callout app

Line 
1"""X.509 certificate extension application for adding SAML assertions into
2certificates issued by MyProxy
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/10/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13log = logging.getLogger(__name__)
14
15from datetime import datetime
16from uuid import uuid4
17
18from saml.common.xml import SAMLConstants
19from saml.saml2.core import (
20    Assertion, Attribute, 
21    SAMLVersion, Subject, NameID, Issuer, AttributeQuery, 
22    XSStringAttributeValue, 
23    StatusCode)
24from saml.xml.etree import ResponseElementTree
25   
26from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
27from ndg.security.common.X509 import X500DN
28
29
30class SamlAssertionMyProxyCertExtApp(object):
31    """Application to create a X.509 certificate extension containing a SAML
32    assertion for inclusion by MyProxy into an issued certificate
33   
34    @type DEFAULT_ATTR_DESCR: tuple
35    @cvar DEFAULT_ATTR_DESCR: a tuple of tuples describing the default
36    SAML attributes to be queried from the Attribute Authority.  The format is,
37   
38    ((<name>, <friendlyName>, <format>), (...), ...)
39   
40    FriendlyName can be defaulted to None in which case it will be omitted from
41    the query
42    """
43    XSSTRING_NS = "%s#%s" % (
44        SAMLConstants.XSD_NS,
45        XSStringAttributeValue.TYPE_LOCAL_NAME
46    )
47    N_ATTR_DESCR_ELEM_ITEMS = 3
48    DEFAULT_ATTR_DESCR = (
49        ("urn:esg:first:name", "FirstName", XSSTRING_NS),
50        ("urn:esg:last:name", "LastName", XSSTRING_NS),
51        ("urn:esg:email:address", "emailAddress", XSSTRING_NS),
52    )
53   
54    __slots__ = (
55       '_SamlAssertionMyProxyCertExtApp__attributeAuthorityURI',
56       '_SamlAssertionMyProxyCertExtApp__userOpenID',
57       '_SamlAssertionMyProxyCertExtApp__issuerDN',
58       '_SamlAssertionMyProxyCertExtApp__attributeDescr',
59       'attributeAuthorityURI',
60       'userOpenID',
61       'issuerDN',
62       'attributeDescr'
63    )
64   
65    def __init__(self):
66        self.__attributeAuthorityURI = None
67        self.__userOpenID = None
68        self.__issuerDN = None
69       
70        # Use property here in case DEFAULT_ATTR_DESCR has been altered
71        self.attributeDescr = SamlAssertionMyProxyCertExtApp.DEFAULT_ATTR_DESCR
72
73    def _getAttributeDescr(self):
74        return self.__attributeDescr
75
76    def _setAttributeDescr(self, value):
77        if not isinstance(value, tuple):
78            raise TypeError('Expecting tuple type for "attributeDescr";'
79                            ' got %r instead' % type(value))
80           
81        for i in value:
82            if not isinstance(value, tuple):
83                raise TypeError('Expecting tuple type for "attributeDescr" '
84                                'tuple sub-elements; got %r instead' % 
85                                type(value))
86            if len(i) != SamlAssertionMyProxyCertExtApp.N_ATTR_DESCR_ELEM_ITEMS:
87                raise TypeError('Expecting %d element tuple for '
88                    '"attributeDescr" sub-elements; got %d elements instead' % 
89                    (SamlAssertionMyProxyCertExtApp.N_ATTR_DESCR_ELEM_ITEMS,
90                    len(i)))
91               
92        self.__attributeDescr = value
93   
94    attributeDescr = property(_getAttributeDescr, 
95                              _setAttributeDescr, 
96                              doc="List of name, friendly name, format tuples "
97                                  "determining attributes to query from the "
98                                  "Attribute Authority")
99
100    def _getAttributeAuthorityURI(self):
101        return self.__attributeAuthorityURI
102
103    def _setAttributeAuthorityURI(self, value):
104        if not isinstance(value, basestring):
105            raise TypeError('Expecting string type for "attributeAuthorityURI";'
106                            ' got %r instead' % type(value))
107        self.__attributeAuthorityURI = value
108
109    attributeAuthorityURI = property(_getAttributeAuthorityURI,
110                                     _setAttributeAuthorityURI, 
111                                     doc="Attribute Authority SOAP SAML URI")
112
113    def _getUserOpenID(self):
114        return self.__userOpenID
115
116    def _setUserOpenID(self, value):
117        if not isinstance(value, basestring):
118            raise TypeError('Expecting string type for "userOpenID"; got %r '
119                            'instead' % type(value))
120        self.__userOpenID = value
121
122    userOpenID = property(_getUserOpenID, _setUserOpenID, 
123                          doc="OpenID corresponding to user certificate to "
124                              "be issued")
125
126    def _getIssuerDN(self):
127        return self.__issuerDN
128
129    def _setIssuerDN(self, value):
130        if isinstance(value, basestring):
131            self.__issuerDN = X500DN.fromString(value)
132           
133        elif isinstance(value, X500DN):
134            self.__issuerDN = value
135        else:
136            raise TypeError('Expecting string or X500DN type for "issuerDN"; '
137                            'got %r instead' % type(value))
138        self.__issuerDN = value
139
140    issuerDN = property(_getIssuerDN, _setIssuerDN, 
141                        doc="Distinguished Name of issuer of SAML Attribute "
142                            "Query to Attribute Authority")
143
144    def __getstate__(self):
145        '''Specific implementation needed with __slots__'''
146        return dict([(attrName, getattr(self, attrName)) \
147                     for attrName in SamlAssertionMyProxyCertExtApp.__slots__])
148       
149    def __setstate__(self, attrDict):
150        '''Specific implementation needed with __slots__'''
151        for attr, val in attrDict.items():
152            setattr(self, attr, val)
153                   
154    def attributeQuery(self):
155        """Query an Attribute Authority to retrieve an assertion for the
156        given user"""
157               
158        # Create a SAML attribute query
159        attributeQuery = AttributeQuery()
160        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
161        attributeQuery.id = str(uuid4())
162        attributeQuery.issueInstant = datetime.utcnow()
163       
164        attributeQuery.issuer = Issuer()
165        attributeQuery.issuer.format = Issuer.X509_SUBJECT
166        attributeQuery.issuer.value = self.issuerDN
167                       
168        attributeQuery.subject = Subject() 
169        attributeQuery.subject.nameID = NameID()
170        attributeQuery.subject.nameID.format = "urn:esg:openid"
171        attributeQuery.subject.nameID.value = self.userOpenID
172                 
173        # Add list of attributes to query                     
174        for name, friendlyName, format in self.attributeDescr:
175            attribute = Attribute()
176            attribute.name = name
177            attribute.nameFormat = format
178            attribute.friendlyName = friendlyName
179   
180            attributeQuery.attributes.append(attribute)
181
182        # Make query over SOAP interface to remote service
183        binding = SamlSoapBinding()
184        response = binding.attributeQuery(attributeQuery, 
185                                          self.attributeAuthorityURI)
186       
187        assert(response.status.statusCode.value==StatusCode.SUCCESS_URI)
188       
189        # Check Query ID matches the query ID the service received
190        assert(response.inResponseTo == attributeQuery.id)
191       
192        now = datetime.utcnow()
193        assert(response.issueInstant < now)
194        assert(response.assertions[-1].issueInstant < now)       
195        assert(response.assertions[-1].conditions.notBefore < now) 
196        assert(response.assertions[-1].conditions.notOnOrAfter > now)
197         
198        samlResponseElem = ResponseElementTree.toXML(response)
199       
200        return
Note: See TracBrowser for help on using the repository browser.