source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 6719

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@6719
Revision 6719, 17.3 KB checked in by pjkersha, 10 years ago (diff)

Refactoring Attribute Authority to remove NDG Attribute Certificate and role mapping code.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from warnings import warn
22from uuid import uuid4
23from datetime import datetime
24from os import path
25
26from ndg.security.test.unit import BaseTestCase
27
28from ndg.security.common.utils.configfileparsers import (
29    CaseSensitiveConfigParser)
30from ndg.security.server.attributeauthority import (AttributeAuthority, 
31    SQLAlchemyAttributeInterface, InvalidAttributeFormat, AttributeInterface)
32
33from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, 
34                                 NameID, Issuer, AttributeQuery, 
35                                 XSStringAttributeValue, Status, StatusMessage, 
36                                 StatusCode)
37from ndg.saml.xml import XMLConstants
38from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
39
40THIS_DIR = path.dirname(__file__)
41
42
43class AttributeAuthorityTestCase(BaseTestCase):
44    THIS_DIR = THIS_DIR
45    PROPERTIES_FILENAME = 'test_attributeauthority.cfg'
46    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
47    ISSUER_NAME = '/O=My Organisation/OU=Centre/CN=Attribute Authority'
48   
49    def test01ParsePropertiesFile(self):
50        cls = AttributeAuthorityTestCase
51        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH)
52        self.assert_(aa)
53        self.assert_(aa.assertionLifetime == 3600)
54        self.assert_(aa.issuerName == cls.ISSUER_NAME)
55       
56    def test02FromProperties(self):
57       
58        # Casts from string to float
59        assertionLifetime = "86400"
60        issuerName = 'My issuer'
61        attributeInterfaceClassName = ('ndg.security.server.attributeauthority.'
62                                       'AttributeInterface')
63       
64        aa = AttributeAuthority.fromProperties(issuerName=issuerName,
65                    assertionLifetime=assertionLifetime,
66                    attributeInterface_className=attributeInterfaceClassName)
67       
68        self.assert_(aa)
69        self.assert_(aa.assertionLifetime == float(assertionLifetime))
70        self.assert_(aa.issuerName == issuerName)
71        self.assert_(isinstance(aa.attributeInterface, AttributeInterface))
72
73
74class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
75    THIS_DIR = THIS_DIR
76    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg'
77    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
78   
79    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
80                             "'${userId}'")
81   
82    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
83                               "'${userId}'")
84           
85    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
86                              "'${userId}'")
87       
88    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
89                                  "openid = '${userId}'")
90       
91    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
92                                "where users.openid = '${userId}' and "
93                                "attributes.username = users.username")
94                               
95    def __init__(self, *arg, **kw):
96        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
97        self.skipTests = False
98        try:
99            import sqlalchemy
100
101        except NotImplementedError:
102            # Don't proceed with tests because SQLAlchemy is not installed
103            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
104                 "SQLAlchemy is not installed")
105            self.skipTests = True
106       
107        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
108            os.environ['NDGSEC_AA_UNITTEST_DIR'
109                       ] = os.path.abspath(os.path.dirname(__file__))
110           
111        self.initDb()
112       
113    def test01TrySamlAttribute2SqlQuery__setattr__(self):
114        if self.skipTests:
115            return
116       
117        attributeInterface = SQLAlchemyAttributeInterface()
118       
119        # Define queries for SAML attribute names
120        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
121            EsgSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
122            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
123           
124        setattr(attributeInterface, 
125                'samlAttribute2SqlQuery.lastName',
126                "%s %s" % (EsgSamlNamespaces.LASTNAME_ATTRNAME,
127                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
128       
129        attributeInterface.samlAttribute2SqlQuery[
130            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
131                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
132       
133        attributeInterface.samlAttribute2SqlQuery[
134            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
135            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
136       
137    def test02SetProperties(self):
138        # test setProperties interface for instance attribute assignment
139        if self.skipTests:
140            return
141       
142        # samlAttribute2SqlQuery* suffixes have no particular requirement
143        # only that they are unique and start with an underscore or period.
144        properties = {
145            'connectionString': 
146                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
147           
148            'samlSubjectSqlQuery':
149                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
150               
151            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
152                EsgSamlNamespaces.FIRSTNAME_ATTRNAME,
153                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
154           
155            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
156                EsgSamlNamespaces.LASTNAME_ATTRNAME,
157                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
158       
159            'samlAttribute2SqlQuery.3': '%s "%s"' % (
160            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME,
161            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
162       
163            'samlAttribute2SqlQuery_0': '%s %s' % (
164                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
165                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
166           
167            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
168                                      '/O=ESG/OU=NCAR/CN=Gateway'),
169            'samlAssertionLifetime': 86400,
170
171        }
172        attributeInterface = SQLAlchemyAttributeInterface()
173        attributeInterface.setProperties(**properties)
174       
175        self.assert_(
176            attributeInterface.samlAttribute2SqlQuery[
177                EsgSamlNamespaces.FIRSTNAME_ATTRNAME] == \
178            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
179       
180        self.assert_(attributeInterface.connectionString == \
181                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
182       
183        # Test constructor setting properties
184        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
185        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
186
187    def test03FromConfigFile(self):
188        if self.skipTests:
189            return
190        cfgParser = CaseSensitiveConfigParser()
191        cls = SQLAlchemyAttributeInterfaceTestCase
192        cfgFilePath = cls.PROPERTIES_FILEPATH
193        cfgParser.read(cfgFilePath)
194       
195        cfg = dict(cfgParser.items('DEFAULT'))
196        attributeInterface = SQLAlchemyAttributeInterface()
197        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
198       
199        self.assert_(
200            attributeInterface.samlAttribute2SqlQuery[
201                EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
202            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
203
204    def test04SamlAttributeQuery(self):
205        if self.skipTests:
206            return
207       
208        # Prepare a client query
209        attributeQuery = AttributeQuery()
210        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
211        attributeQuery.id = str(uuid4())
212        attributeQuery.issueInstant = datetime.utcnow()
213       
214        attributeQuery.issuer = Issuer()
215        attributeQuery.issuer.format = Issuer.X509_SUBJECT
216        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
217                       
218                       
219        attributeQuery.subject = Subject() 
220        attributeQuery.subject.nameID = NameID()
221        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
222        attributeQuery.subject.nameID.value = \
223                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
224       
225        fnAttribute = Attribute()
226        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
227        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
228        fnAttribute.friendlyName = "FirstName"
229
230        attributeQuery.attributes.append(fnAttribute)
231   
232        lnAttribute = Attribute()
233        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
234        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
235        lnAttribute.friendlyName = "LastName"
236
237        attributeQuery.attributes.append(lnAttribute)
238   
239        emailAddressAttribute = Attribute()
240        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
241        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
242        emailAddressAttribute.friendlyName = "EmailAddress"
243
244        attributeQuery.attributes.append(emailAddressAttribute)                                   
245   
246        authzAttribute = Attribute()
247        authzAttribute.name = \
248            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
249        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
250        authzAttribute.friendlyName = "authz"
251
252        attributeQuery.attributes.append(authzAttribute)                                   
253       
254        # Add the response - the interface will populate with an assertion as
255        # appropriate
256        samlResponse = Response()
257       
258        samlResponse.issueInstant = datetime.utcnow()
259        samlResponse.id = str(uuid4())
260        samlResponse.issuer = Issuer()
261       
262        # Initialise to success status but reset on error
263        samlResponse.status = Status()
264        samlResponse.status.statusCode = StatusCode()
265        samlResponse.status.statusMessage = StatusMessage()
266        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
267       
268        # Nb. SAML 2.0 spec says issuer format must be omitted
269        samlResponse.issuer.value = "CEDA"
270       
271        samlResponse.inResponseTo = attributeQuery.id
272       
273        # Set up the interface object
274       
275        # Define queries for SAML attribute names
276        samlAttribute2SqlQuery = {
277            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
278                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
279           
280            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
281                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
282       
283            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
284                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
285       
286            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
287                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
288        }
289       
290        attributeInterface = SQLAlchemyAttributeInterface(
291                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
292       
293        attributeInterface.connectionString = \
294                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
295               
296        attributeInterface.samlValidRequestorDNs = (
297            '/O=STFC/OU=CEDA/CN=AuthorisationService',
298            '/O=ESG/OU=NCAR/CN=Gateway')
299       
300        attributeInterface.setProperties(samlAssertionLifetime=28800.,
301                                issuerName='/CN=Attribute Authority/O=Site A')
302       
303        attributeInterface.samlSubjectSqlQuery = (
304            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
305       
306        # Make the query
307        attributeInterface.getAttributes(attributeQuery, samlResponse)
308       
309        self.assert_(
310                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
311        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
312        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
313                     attributeQuery.subject.nameID.value)
314        self.assert_(
315            samlResponse.assertions[0].attributeStatements[0].attributes[1
316                ].attributeValues[0].value == 'Kershaw')
317       
318        self.assert_(
319            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
320                ].attributeValues) == \
321                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
322
323    def test04SamlAttributeQuery(self):
324        if self.skipTests:
325            return
326       
327        # Prepare a client query
328        attributeQuery = AttributeQuery()
329        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
330        attributeQuery.id = str(uuid4())
331        attributeQuery.issueInstant = datetime.utcnow()
332       
333        attributeQuery.issuer = Issuer()
334        attributeQuery.issuer.format = Issuer.X509_SUBJECT
335        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
336                       
337                       
338        attributeQuery.subject = Subject() 
339        attributeQuery.subject.nameID = NameID()
340        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
341        attributeQuery.subject.nameID.value = \
342                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
343   
344        emailAddressAttribute = Attribute()
345        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
346        emailAddressAttribute.nameFormat = "InvalidFormat"
347        emailAddressAttribute.friendlyName = "EmailAddress"
348
349        attributeQuery.attributes.append(emailAddressAttribute)                                   
350   
351        authzAttribute = Attribute()
352        authzAttribute.name = \
353            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
354        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
355        authzAttribute.friendlyName = "authz"
356
357        attributeQuery.attributes.append(authzAttribute)                                   
358       
359        # Add the response - the interface will populate with an assertion as
360        # appropriate
361        samlResponse = Response()
362       
363        samlResponse.issueInstant = datetime.utcnow()
364        samlResponse.id = str(uuid4())
365        samlResponse.issuer = Issuer()
366       
367        # Initialise to success status but reset on error
368        samlResponse.status = Status()
369        samlResponse.status.statusCode = StatusCode()
370        samlResponse.status.statusMessage = StatusMessage()
371        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
372       
373        # Nb. SAML 2.0 spec says issuer format must be omitted
374        samlResponse.issuer.value = "CEDA"
375       
376        samlResponse.inResponseTo = attributeQuery.id
377       
378        # Set up the interface object
379       
380        # Define queries for SAML attribute names
381        samlAttribute2SqlQuery = {
382            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
383                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
384           
385            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
386                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
387       
388            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
389                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
390       
391            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
392                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
393        }
394       
395        attributeInterface = SQLAlchemyAttributeInterface(
396                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
397       
398        attributeInterface.connectionString = \
399                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
400               
401        attributeInterface.samlValidRequestorDNs = (
402            '/O=STFC/OU=CEDA/CN=AuthorisationService',
403            '/O=ESG/OU=NCAR/CN=Gateway')
404       
405        attributeInterface.setProperties(samlAssertionLifetime=28800.,
406                                issuerName='/CN=Attribute Authority/O=Site A')
407       
408        attributeInterface.samlSubjectSqlQuery = (
409            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
410       
411        # Make the query
412        try:
413            attributeInterface.getAttributes(attributeQuery, samlResponse)
414        except InvalidAttributeFormat:
415            print("PASSED: caught InvalidAttributeFormat exception")
416        else:
417            self.fail("Expecting InvalidAttributeFormat exception")
418       
419if __name__ == "__main__":
420    unittest.main()
Note: See TracBrowser for help on using the repository browser.