source: TI12-security/trunk/python/ndg_security_server/ndg/security/server/myproxy/certificate_extapp/saml_attribute_assertion.py @ 5907

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_server/ndg/security/server/myproxy/certificate_extapp/saml_attribute_assertion.py@5907
Revision 5907, 6.7 KB checked in by pjkersha, 11 years ago (diff)

Move MyProxy? certificate callout application into ndg.security.server and ndg.security.test. Too much trouble as separate egg.

Line 
1"""X.509 certificate extension application for adding SAML assertions into
2certificates issued by MyProxy
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/10/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13log = logging.getLogger(__name__)
14
15from datetime import datetime
16from uuid import uuid4
17
18from saml.common.xml import SAMLConstants
19from saml.saml2.core import (
20    Assertion, Attribute, 
21    SAMLVersion, Subject, NameID, Issuer, AttributeQuery, 
22    XSStringAttributeValue, 
23    StatusCode)
24from saml.xml.etree import ResponseElementTree
25   
26from ndg.security.common.saml.bindings import SOAPBinding as SamlSoapBinding
27
28
29class SamlAssertionMyProxyCertExtApp(object):
30    """Application to create a X.509 certificate extension containing a SAML
31    assertion for inclusion by MyProxy into an issued certificate
32   
33    @type DEFAULT_ATTR_DESCR: tuple
34    @cvar DEFAULT_ATTR_DESCR: a tuple of tuples describing the default
35    SAML attributes to be queried from the Attribute Authority.  The format is,
36   
37    ((<name>, <friendlyName>, <format>), (...), ...)
38   
39    FriendlyName can be defaulted to None in which case it will be omitted from
40    the query
41    """
42    XSSTRING_NS = "%s#%s" % (
43        SAMLConstants.XSD_NS,
44        XSStringAttributeValue.TYPE_LOCAL_NAME
45    )
46    N_ATTR_DESCR_ELEM_ITEMS = 3
47    DEFAULT_ATTR_DESCR = (
48        ("urn:esg:first:name", "FirstName", XSSTRING_NS),
49        ("urn:esg:last:name", "LastName", XSSTRING_NS),
50        ("urn:esg:email:address", "emailAddress", XSSTRING_NS),
51    )
52   
53    def __init__(self):
54        self.__attributeAuthorityURI = None
55        self.__userOpenID = None
56        self.__issuerName = None
57       
58        # Use property here in case DEFAULT_ATTR_DESCR has been altered
59        self.attributeDescr = SamlAssertionMyProxyCertExtApp.DEFAULT_ATTR_DESCR
60
61    def _getAttributeDescr(self):
62        return self.__attributeDescr
63
64    def _setAttributeDescr(self, value):
65        if not isinstance(value, tuple):
66            raise TypeError('Expecting tuple type for "attributeDescr";'
67                            ' got %r instead' % type(value))
68           
69        for i in value:
70            if not isinstance(value, tuple):
71                raise TypeError('Expecting tuple type for "attributeDescr" '
72                                'tuple sub-elements; got %r instead' % 
73                                type(value))
74            if len(i) != SamlAssertionMyProxyCertExtApp.N_ATTR_DESCR_ELEM_ITEMS:
75                raise TypeError('Expecting %d element tuple for '
76                    '"attributeDescr" sub-elements; got %d elements instead' % 
77                    (SamlAssertionMyProxyCertExtApp.N_ATTR_DESCR_ELEM_ITEMS,
78                    len(i)))
79               
80        self.__attributeDescr = value
81
82    def _getAttributeAuthorityURI(self):
83        return self.__attributeAuthorityURI
84
85    def _setAttributeAuthorityURI(self, value):
86        if not isinstance(value, basestring):
87            raise TypeError('Expecting string type for "attributeAuthorityURI";'
88                            ' got %r instead' % type(value))
89        self.__attributeAuthorityURI = value
90
91    def _getUserOpenID(self):
92        return self.__userOpenID
93
94    def _setUserOpenID(self, value):
95        if not isinstance(value, basestring):
96            raise TypeError('Expecting string type for "userOpenID";'
97                            ' got %r instead' % type(value))
98        self.__userOpenID = value
99
100    def _getIssuerName(self):
101        return self.__issuerName
102
103    def _setIssuerName(self, value):
104        if not isinstance(value, basestring):
105            raise TypeError('Expecting string type for "issuerName";'
106                            ' got %r instead' % type(value))
107        self.__issuerName = value
108
109    attributeAuthorityURI = property(_getAttributeAuthorityURI,
110                                     _setAttributeAuthorityURI, 
111                                     doc="AttributeAuthorityURI's Docstring")
112
113    userOpenID = property(_getUserOpenID, _setUserOpenID, 
114                          doc="OpenID corresponding to user certificate to "
115                              "be issued")
116
117    issuerName = property(_getIssuerName, _setIssuerName, 
118                          doc="Name of issuer of SAML Attribute Query to "
119                              "Attribute Authority")
120   
121    attributeDescr = property(_getAttributeDescr, 
122                              _setAttributeDescr, 
123                              doc="List of name, friendly name, format tuples "
124                                  "determining attributes to query from the "
125                                  "Attribute Authority")
126       
127    def attributeQuery(self):
128        """Query an Attribute Authority to retrieve an assertion for the
129        given user"""
130               
131        # Create a SAML attribute query
132        attributeQuery = AttributeQuery()
133        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
134        attributeQuery.id = str(uuid4())
135        attributeQuery.issueInstant = datetime.utcnow()
136       
137        attributeQuery.issuer = Issuer()
138        attributeQuery.issuer.format = "urn:esg:issuer"
139        attributeQuery.issuer.value = self.issuerName 
140                       
141        attributeQuery.subject = Subject() 
142        attributeQuery.subject.nameID = NameID()
143        attributeQuery.subject.nameID.format = "urn:esg:openid"
144        attributeQuery.subject.nameID.value = self.userOpenID
145                 
146        # Add list of attributes to query                     
147        for name, friendlyName, format in self.attributeDescr:
148            attribute = Attribute()
149            attribute.name = name
150            attribute.nameFormat = format
151            attribute.friendlyName = friendlyName
152   
153            attributeQuery.attributes.append(attribute)
154
155        # Make query over SOAP interface to remote service
156        binding = SamlSoapBinding()
157        response = binding.attributeQuery(attributeQuery, 
158                                          self.attributeAuthorityURI)
159       
160        assert(response.status.statusCode.value==StatusCode.SUCCESS_URI)
161       
162        # Check Query ID matches the query ID the service received
163        assert(response.inResponseTo == attributeQuery.id)
164       
165        now = datetime.utcnow()
166        assert(response.issueInstant < now)
167        assert(response.assertions[-1].issueInstant < now)       
168        assert(response.assertions[-1].conditions.notBefore < now) 
169        assert(response.assertions[-1].conditions.notOnOrAfter > now)
170         
171        samlResponseElem = ResponseElementTree.toXML(response)
172       
173        return
Note: See TracBrowser for help on using the repository browser.