source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 6686

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@6686
Revision 6686, 17.0 KB checked in by pjkersha, 10 years ago (diff)

Refactoring Attribute Authority to remove NDG Attribute Certificate and role mapping code.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from warnings import warn
22from uuid import uuid4
23from datetime import datetime
24from os import path
25
26from ndg.security.test.unit import BaseTestCase
27
28from ndg.security.common.utils.configfileparsers import (
29    CaseSensitiveConfigParser)
30from ndg.security.server.attributeauthority import (AttributeAuthority, 
31    SQLAlchemyAttributeInterface, InvalidAttributeFormat)
32
33from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, 
34                                 NameID, Issuer, AttributeQuery, 
35                                 XSStringAttributeValue, Status, StatusMessage, 
36                                 StatusCode)
37from ndg.saml.xml import XMLConstants
38from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
39
40THIS_DIR = path.dirname(__file__)
41
42
43class AttributeAuthorityTestCase(BaseTestCase):
44    THIS_DIR = THIS_DIR
45    PROPERTIES_FILENAME = 'test_attributeauthority.cfg'
46    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
47    ISSUER_NAME = '/O=My Organisation/OU=Centre/CN=Attribute Authority'
48   
49    def test01ParsePropertiesFile(self):
50        cls = AttributeAuthorityTestCase
51        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH)
52        self.assert_(aa)
53        self.assert_(aa.assertionLifetime == 3600)
54        self.assert_(aa.issuerName == cls.ISSUER_NAME)
55       
56    def test02FromPropertis(self):
57       
58        # Casts from string to float
59        assertionLifetime = "86400"
60        issuerName = 'My issuer'
61        aa = AttributeAuthority.fromProperties(issuerName=issuerName,
62                                            assertionLifetime=assertionLifetime)
63        self.assert_(aa)
64        self.assert_(aa.assertionLifetime == float(assertionLifetime))
65        self.assert_(aa.issuerName == issuerName)
66
67
68class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
69    THIS_DIR = THIS_DIR
70    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg'
71    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
72   
73    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
74                             "'${userId}'")
75   
76    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
77                               "'${userId}'")
78           
79    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
80                              "'${userId}'")
81       
82    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
83                                  "openid = '${userId}'")
84       
85    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
86                                "where users.openid = '${userId}' and "
87                                "attributes.username = users.username")
88                               
89    def __init__(self, *arg, **kw):
90        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
91        self.skipTests = False
92        try:
93            import sqlalchemy
94
95        except NotImplementedError:
96            # Don't proceed with tests because SQLAlchemy is not installed
97            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
98                 "SQLAlchemy is not installed")
99            self.skipTests = True
100       
101        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
102            os.environ['NDGSEC_AA_UNITTEST_DIR'
103                       ] = os.path.abspath(os.path.dirname(__file__))
104           
105        self.initDb()
106       
107    def test01TrySamlAttribute2SqlQuery__setattr__(self):
108        if self.skipTests:
109            return
110       
111        attributeInterface = SQLAlchemyAttributeInterface()
112       
113        # Define queries for SAML attribute names
114        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
115            EsgSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
116            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
117           
118        setattr(attributeInterface, 
119                'samlAttribute2SqlQuery.lastName',
120                "%s %s" % (EsgSamlNamespaces.LASTNAME_ATTRNAME,
121                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
122       
123        attributeInterface.samlAttribute2SqlQuery[
124            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
125                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
126       
127        attributeInterface.samlAttribute2SqlQuery[
128            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
129            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
130       
131    def test02SetProperties(self):
132        # test setProperties interface for instance attribute assignment
133        if self.skipTests:
134            return
135       
136        # samlAttribute2SqlQuery* suffixes have no particular requirement
137        # only that they are unique and start with an underscore or period.
138        properties = {
139            'connectionString': 
140                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
141           
142            'samlSubjectSqlQuery':
143                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
144               
145            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
146                EsgSamlNamespaces.FIRSTNAME_ATTRNAME,
147                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
148           
149            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
150                EsgSamlNamespaces.LASTNAME_ATTRNAME,
151                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
152       
153            'samlAttribute2SqlQuery.3': '%s "%s"' % (
154            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME,
155            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
156       
157            'samlAttribute2SqlQuery_0': '%s %s' % (
158                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
159                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
160           
161            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
162                                      '/O=ESG/OU=NCAR/CN=Gateway'),
163            'samlAssertionLifetime': 86400,
164
165        }
166        attributeInterface = SQLAlchemyAttributeInterface()
167        attributeInterface.setProperties(**properties)
168       
169        self.assert_(
170            attributeInterface.samlAttribute2SqlQuery[
171                EsgSamlNamespaces.FIRSTNAME_ATTRNAME] == \
172            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
173       
174        self.assert_(attributeInterface.connectionString == \
175                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
176       
177        # Test constructor setting properties
178        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
179        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
180
181    def test03FromConfigFile(self):
182        if self.skipTests:
183            return
184        cfgParser = CaseSensitiveConfigParser()
185        cls = SQLAlchemyAttributeInterfaceTestCase
186        cfgFilePath = cls.PROPERTIES_FILEPATH
187        cfgParser.read(cfgFilePath)
188       
189        cfg = dict(cfgParser.items('DEFAULT'))
190        attributeInterface = SQLAlchemyAttributeInterface()
191        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
192       
193        self.assert_(
194            attributeInterface.samlAttribute2SqlQuery[
195                EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
196            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
197
198    def test04SamlAttributeQuery(self):
199        if self.skipTests:
200            return
201       
202        # Prepare a client query
203        attributeQuery = AttributeQuery()
204        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
205        attributeQuery.id = str(uuid4())
206        attributeQuery.issueInstant = datetime.utcnow()
207       
208        attributeQuery.issuer = Issuer()
209        attributeQuery.issuer.format = Issuer.X509_SUBJECT
210        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
211                       
212                       
213        attributeQuery.subject = Subject() 
214        attributeQuery.subject.nameID = NameID()
215        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
216        attributeQuery.subject.nameID.value = \
217                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
218       
219        fnAttribute = Attribute()
220        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
221        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
222        fnAttribute.friendlyName = "FirstName"
223
224        attributeQuery.attributes.append(fnAttribute)
225   
226        lnAttribute = Attribute()
227        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
228        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
229        lnAttribute.friendlyName = "LastName"
230
231        attributeQuery.attributes.append(lnAttribute)
232   
233        emailAddressAttribute = Attribute()
234        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
235        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
236        emailAddressAttribute.friendlyName = "EmailAddress"
237
238        attributeQuery.attributes.append(emailAddressAttribute)                                   
239   
240        authzAttribute = Attribute()
241        authzAttribute.name = \
242            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
243        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
244        authzAttribute.friendlyName = "authz"
245
246        attributeQuery.attributes.append(authzAttribute)                                   
247       
248        # Add the response - the interface will populate with an assertion as
249        # appropriate
250        samlResponse = Response()
251       
252        samlResponse.issueInstant = datetime.utcnow()
253        samlResponse.id = str(uuid4())
254        samlResponse.issuer = Issuer()
255       
256        # Initialise to success status but reset on error
257        samlResponse.status = Status()
258        samlResponse.status.statusCode = StatusCode()
259        samlResponse.status.statusMessage = StatusMessage()
260        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
261       
262        # Nb. SAML 2.0 spec says issuer format must be omitted
263        samlResponse.issuer.value = "CEDA"
264       
265        samlResponse.inResponseTo = attributeQuery.id
266       
267        # Set up the interface object
268       
269        # Define queries for SAML attribute names
270        samlAttribute2SqlQuery = {
271            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
272                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
273           
274            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
275                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
276       
277            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
278                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
279       
280            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
281                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
282        }
283       
284        attributeInterface = SQLAlchemyAttributeInterface(
285                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
286       
287        attributeInterface.connectionString = \
288                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
289               
290        attributeInterface.samlValidRequestorDNs = (
291            '/O=STFC/OU=CEDA/CN=AuthorisationService',
292            '/O=ESG/OU=NCAR/CN=Gateway')
293       
294        attributeInterface.setProperties(samlAssertionLifetime=28800.,
295                                issuerName='/CN=Attribute Authority/O=Site A')
296       
297        attributeInterface.samlSubjectSqlQuery = (
298            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
299       
300        # Make the query
301        attributeInterface.getAttributes(attributeQuery, samlResponse)
302       
303        self.assert_(
304                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
305        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
306        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
307                     attributeQuery.subject.nameID.value)
308        self.assert_(
309            samlResponse.assertions[0].attributeStatements[0].attributes[1
310                ].attributeValues[0].value == 'Kershaw')
311       
312        self.assert_(
313            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
314                ].attributeValues) == \
315                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
316
317    def test04SamlAttributeQuery(self):
318        if self.skipTests:
319            return
320       
321        # Prepare a client query
322        attributeQuery = AttributeQuery()
323        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
324        attributeQuery.id = str(uuid4())
325        attributeQuery.issueInstant = datetime.utcnow()
326       
327        attributeQuery.issuer = Issuer()
328        attributeQuery.issuer.format = Issuer.X509_SUBJECT
329        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
330                       
331                       
332        attributeQuery.subject = Subject() 
333        attributeQuery.subject.nameID = NameID()
334        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
335        attributeQuery.subject.nameID.value = \
336                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
337   
338        emailAddressAttribute = Attribute()
339        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
340        emailAddressAttribute.nameFormat = "InvalidFormat"
341        emailAddressAttribute.friendlyName = "EmailAddress"
342
343        attributeQuery.attributes.append(emailAddressAttribute)                                   
344   
345        authzAttribute = Attribute()
346        authzAttribute.name = \
347            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
348        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
349        authzAttribute.friendlyName = "authz"
350
351        attributeQuery.attributes.append(authzAttribute)                                   
352       
353        # Add the response - the interface will populate with an assertion as
354        # appropriate
355        samlResponse = Response()
356       
357        samlResponse.issueInstant = datetime.utcnow()
358        samlResponse.id = str(uuid4())
359        samlResponse.issuer = Issuer()
360       
361        # Initialise to success status but reset on error
362        samlResponse.status = Status()
363        samlResponse.status.statusCode = StatusCode()
364        samlResponse.status.statusMessage = StatusMessage()
365        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
366       
367        # Nb. SAML 2.0 spec says issuer format must be omitted
368        samlResponse.issuer.value = "CEDA"
369       
370        samlResponse.inResponseTo = attributeQuery.id
371       
372        # Set up the interface object
373       
374        # Define queries for SAML attribute names
375        samlAttribute2SqlQuery = {
376            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
377                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
378           
379            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
380                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
381       
382            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
383                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
384       
385            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
386                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
387        }
388       
389        attributeInterface = SQLAlchemyAttributeInterface(
390                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
391       
392        attributeInterface.connectionString = \
393                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
394               
395        attributeInterface.samlValidRequestorDNs = (
396            '/O=STFC/OU=CEDA/CN=AuthorisationService',
397            '/O=ESG/OU=NCAR/CN=Gateway')
398       
399        attributeInterface.setProperties(samlAssertionLifetime=28800.,
400                                issuerName='/CN=Attribute Authority/O=Site A')
401       
402        attributeInterface.samlSubjectSqlQuery = (
403            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
404       
405        # Make the query
406        try:
407            attributeInterface.getAttributes(attributeQuery, samlResponse)
408        except InvalidAttributeFormat:
409            print("PASSED: caught InvalidAttributeFormat exception")
410        else:
411            self.fail("Expecting InvalidAttributeFormat exception")
412       
413if __name__ == "__main__":
414    unittest.main()
Note: See TracBrowser for help on using the repository browser.