source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 7698

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@7698
Revision 7698, 17.7 KB checked in by pjkersha, 10 years ago (diff)

Integrated SAML ESGF Group/Role? attribute value type into SAML Attribute Authority client unit tests.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from warnings import warn
22from uuid import uuid4
23from datetime import datetime
24from os import path
25import pickle
26
27from ndg.security.test.unit import BaseTestCase
28
29from ndg.security.common.utils.configfileparsers import (
30    CaseSensitiveConfigParser)
31from ndg.security.server.attributeauthority import (AttributeAuthority, 
32    SQLAlchemyAttributeInterface, InvalidAttributeFormat, AttributeInterface)
33
34from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, 
35                                 NameID, Issuer, AttributeQuery, 
36                                 XSStringAttributeValue, Status, StatusMessage, 
37                                 StatusCode)
38from ndg.saml.xml import XMLConstants
39from ndg.security.common.saml_utils.esgf import ESGFSamlNamespaces
40
41THIS_DIR = path.dirname(__file__)
42
43
44class AttributeAuthorityTestCase(BaseTestCase):
45    THIS_DIR = THIS_DIR
46    PROPERTIES_FILENAME = 'test_attributeauthority.cfg'
47    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
48    ASSERTION_LIFETIME = "86400"
49
50    def test01ParsePropertiesFile(self):
51        cls = AttributeAuthorityTestCase
52        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH)
53        self.assert_(aa)
54        self.assert_(aa.assertionLifetime == 3600)
55       
56    def _createAttributeAuthorityHelper(self):
57        """Helper method to creat an Attribute Authority instance for use with
58        tests
59        """
60       
61        cls = AttributeAuthorityTestCase
62       
63        attributeInterfaceClassName = ('ndg.security.server.attributeauthority.'
64                                       'AttributeInterface')
65       
66        aa = AttributeAuthority.fromProperties(
67                    assertionLifetime=cls.ASSERTION_LIFETIME,
68                    attributeInterface_className=attributeInterfaceClassName)
69       
70        return aa
71           
72    def test02FromProperties(self):
73       
74        cls = AttributeAuthorityTestCase
75        aa = self._createAttributeAuthorityHelper()
76       
77        self.assert_(aa)
78       
79        # Check lifetime property converted from string input to float
80        self.assert_(aa.assertionLifetime == float(cls.ASSERTION_LIFETIME))
81        self.assert_(isinstance(aa.attributeInterface, AttributeInterface))
82
83    def test03Pickle(self):
84        # Test pickling with __slots__
85        aa = self._createAttributeAuthorityHelper()       
86        jar = pickle.dumps(aa)
87        aa2 = pickle.loads(jar)
88       
89        self.assert_(aa2)
90        self.assert_(aa2.assertionLifetime == aa.assertionLifetime)
91        self.assert_(isinstance(aa2.attributeInterface, AttributeInterface))
92   
93       
94class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
95    THIS_DIR = THIS_DIR
96    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg'
97    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
98   
99    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
100                             "'${userId}'")
101   
102    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
103                               "'${userId}'")
104           
105    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
106                              "'${userId}'")
107       
108    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
109                                  "openid = '${userId}'")
110       
111    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
112                                "where users.openid = '${userId}' and "
113                                "attributes.username = users.username")
114                               
115    def __init__(self, *arg, **kw):
116        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
117        self.skipTests = False
118        try:
119            import sqlalchemy
120
121        except NotImplementedError:
122            # Don't proceed with tests because SQLAlchemy is not installed
123            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
124                 "SQLAlchemy is not installed")
125            self.skipTests = True
126       
127        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
128            os.environ['NDGSEC_AA_UNITTEST_DIR'
129                       ] = os.path.abspath(os.path.dirname(__file__))
130           
131        self.initDb()
132       
133    def test01TrySamlAttribute2SqlQuery__setattr__(self):
134        if self.skipTests:
135            return
136       
137        attributeInterface = SQLAlchemyAttributeInterface()
138       
139        # Define queries for SAML attribute names
140        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
141            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
142            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
143           
144        setattr(attributeInterface, 
145                'samlAttribute2SqlQuery.lastName',
146                "%s %s" % (ESGFSamlNamespaces.LASTNAME_ATTRNAME,
147                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
148       
149        attributeInterface.samlAttribute2SqlQuery[
150            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
151                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
152       
153        attributeInterface.samlAttribute2SqlQuery[
154            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
155            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
156       
157    def test02SetProperties(self):
158        # test setProperties interface for instance attribute assignment
159        if self.skipTests:
160            return
161       
162        # samlAttribute2SqlQuery* suffixes have no particular requirement
163        # only that they are unique and start with an underscore or period.
164        properties = {
165            'connectionString': 
166                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
167           
168            'samlSubjectSqlQuery':
169                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
170               
171            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
172                ESGFSamlNamespaces.FIRSTNAME_ATTRNAME,
173                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
174           
175            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
176                ESGFSamlNamespaces.LASTNAME_ATTRNAME,
177                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
178       
179            'samlAttribute2SqlQuery.3': '%s "%s"' % (
180            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME,
181            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
182       
183            'samlAttribute2SqlQuery_0': '%s %s' % (
184                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
185                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
186           
187            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
188                                      '/O=ESG/OU=NCAR/CN=Gateway'),
189            'samlAssertionLifetime': 86400,
190
191        }
192        attributeInterface = SQLAlchemyAttributeInterface()
193        attributeInterface.setProperties(**properties)
194       
195        self.assert_(
196            attributeInterface.samlAttribute2SqlQuery[
197                ESGFSamlNamespaces.FIRSTNAME_ATTRNAME] == \
198            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
199       
200        self.assert_(attributeInterface.connectionString == \
201                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
202       
203        # Test constructor setting properties
204        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
205        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
206
207    def test03FromConfigFile(self):
208        if self.skipTests:
209            return
210        cfgParser = CaseSensitiveConfigParser()
211        cls = SQLAlchemyAttributeInterfaceTestCase
212        cfgFilePath = cls.PROPERTIES_FILEPATH
213        cfgParser.read(cfgFilePath)
214       
215        cfg = dict(cfgParser.items('DEFAULT'))
216        attributeInterface = SQLAlchemyAttributeInterface()
217        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
218       
219        self.assert_(
220            attributeInterface.samlAttribute2SqlQuery[
221                ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
222            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
223
224    def test04SamlAttributeQuery(self):
225        if self.skipTests:
226            return
227       
228        # Prepare a client query
229        attributeQuery = AttributeQuery()
230        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
231        attributeQuery.id = str(uuid4())
232        attributeQuery.issueInstant = datetime.utcnow()
233       
234        attributeQuery.issuer = Issuer()
235        attributeQuery.issuer.format = Issuer.X509_SUBJECT
236        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
237                       
238                       
239        attributeQuery.subject = Subject() 
240        attributeQuery.subject.nameID = NameID()
241        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
242        attributeQuery.subject.nameID.value = \
243                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
244       
245        fnAttribute = Attribute()
246        fnAttribute.name = ESGFSamlNamespaces.FIRSTNAME_ATTRNAME
247        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
248        fnAttribute.friendlyName = "FirstName"
249
250        attributeQuery.attributes.append(fnAttribute)
251   
252        lnAttribute = Attribute()
253        lnAttribute.name = ESGFSamlNamespaces.LASTNAME_ATTRNAME
254        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
255        lnAttribute.friendlyName = "LastName"
256
257        attributeQuery.attributes.append(lnAttribute)
258   
259        emailAddressAttribute = Attribute()
260        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
261        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
262        emailAddressAttribute.friendlyName = "EmailAddress"
263
264        attributeQuery.attributes.append(emailAddressAttribute)                                   
265   
266        authzAttribute = Attribute()
267        authzAttribute.name = \
268            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
269        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
270        authzAttribute.friendlyName = "authz"
271
272        attributeQuery.attributes.append(authzAttribute)                                   
273       
274        # Add the response - the interface will populate with an assertion as
275        # appropriate
276        samlResponse = Response()
277       
278        samlResponse.issueInstant = datetime.utcnow()
279        samlResponse.id = str(uuid4())
280        samlResponse.issuer = Issuer()
281       
282        # Initialise to success status but reset on error
283        samlResponse.status = Status()
284        samlResponse.status.statusCode = StatusCode()
285        samlResponse.status.statusMessage = StatusMessage()
286        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
287       
288        # Nb. SAML 2.0 spec says issuer format must be omitted
289        samlResponse.issuer.value = "CEDA"
290       
291        samlResponse.inResponseTo = attributeQuery.id
292       
293        # Set up the interface object
294       
295        # Define queries for SAML attribute names
296        samlAttribute2SqlQuery = {
297            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME: 
298                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
299           
300            ESGFSamlNamespaces.LASTNAME_ATTRNAME: 
301                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
302       
303            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME: 
304                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
305       
306            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
307                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
308        }
309       
310        attributeInterface = SQLAlchemyAttributeInterface(
311                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
312       
313        attributeInterface.connectionString = \
314                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
315               
316        attributeInterface.samlValidRequestorDNs = (
317            '/O=STFC/OU=CEDA/CN=AuthorisationService',
318            '/O=ESG/OU=NCAR/CN=Gateway')
319       
320        attributeInterface.setProperties(samlAssertionLifetime=28800.)
321       
322        attributeInterface.samlSubjectSqlQuery = (
323            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
324       
325        # Make the query
326        attributeInterface.getAttributes(attributeQuery, samlResponse)
327       
328        self.assert_(
329                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
330        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
331        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
332                     attributeQuery.subject.nameID.value)
333        self.assert_(
334            samlResponse.assertions[0].attributeStatements[0].attributes[1
335                ].attributeValues[0].value == 'Kershaw')
336       
337        self.assert_(
338            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
339                ].attributeValues) == \
340                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
341
342    def test04SamlAttributeQuery(self):
343        if self.skipTests:
344            return
345       
346        # Prepare a client query
347        attributeQuery = AttributeQuery()
348        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
349        attributeQuery.id = str(uuid4())
350        attributeQuery.issueInstant = datetime.utcnow()
351       
352        attributeQuery.issuer = Issuer()
353        attributeQuery.issuer.format = Issuer.X509_SUBJECT
354        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
355                       
356                       
357        attributeQuery.subject = Subject() 
358        attributeQuery.subject.nameID = NameID()
359        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
360        attributeQuery.subject.nameID.value = \
361                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
362   
363        emailAddressAttribute = Attribute()
364        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
365        emailAddressAttribute.nameFormat = "InvalidFormat"
366        emailAddressAttribute.friendlyName = "EmailAddress"
367
368        attributeQuery.attributes.append(emailAddressAttribute)                                   
369   
370        authzAttribute = Attribute()
371        authzAttribute.name = \
372            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
373        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
374        authzAttribute.friendlyName = "authz"
375
376        attributeQuery.attributes.append(authzAttribute)                                   
377       
378        # Add the response - the interface will populate with an assertion as
379        # appropriate
380        samlResponse = Response()
381       
382        samlResponse.issueInstant = datetime.utcnow()
383        samlResponse.id = str(uuid4())
384        samlResponse.issuer = Issuer()
385       
386        # Initialise to success status but reset on error
387        samlResponse.status = Status()
388        samlResponse.status.statusCode = StatusCode()
389        samlResponse.status.statusMessage = StatusMessage()
390        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
391       
392        # Nb. SAML 2.0 spec says issuer format must be omitted
393        samlResponse.issuer.value = "CEDA"
394       
395        samlResponse.inResponseTo = attributeQuery.id
396       
397        # Set up the interface object
398       
399        # Define queries for SAML attribute names
400        samlAttribute2SqlQuery = {
401            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME: 
402                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
403           
404            ESGFSamlNamespaces.LASTNAME_ATTRNAME: 
405                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
406       
407            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME: 
408                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
409       
410            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
411                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY
412        }
413       
414        attributeInterface = SQLAlchemyAttributeInterface(
415                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
416       
417        attributeInterface.connectionString = \
418                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
419               
420        attributeInterface.samlValidRequestorDNs = (
421            '/O=STFC/OU=CEDA/CN=AuthorisationService',
422            '/O=ESG/OU=NCAR/CN=Gateway')
423       
424        attributeInterface.setProperties(samlAssertionLifetime=28800.)
425       
426        attributeInterface.samlSubjectSqlQuery = (
427            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
428       
429        # Make the query
430        try:
431            attributeInterface.getAttributes(attributeQuery, samlResponse)
432        except InvalidAttributeFormat:
433            print("PASSED: caught InvalidAttributeFormat exception")
434        else:
435            self.fail("Expecting InvalidAttributeFormat exception")
436       
437if __name__ == "__main__":
438    unittest.main()
Note: See TracBrowser for help on using the repository browser.