source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 6063

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@6063
Revision 6063, 14.8 KB checked in by pjkersha, 10 years ago (diff)

Working authz lite integration tests with integrated SAML Attribute Authority interface to authz middleware: the old NDG Attribute Authority SOAP/WSDL interface is completely removed as a dependency.

  • major fixes to ndg.security.common.credentialwallet NDGCredentialWallet and SAMLCredentialWallet for slots and pickling capability needed for beaker.session. NDGCredentialWallet is kept for the moment for backwards compatibility.
Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet classes
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12
13import unittest
14import os, sys, getpass, re
15import traceback
16
17from string import Template
18from cStringIO import StringIO
19import cPickle as pickle
20
21from elementtree import ElementTree
22
23from time import sleep
24from datetime import datetime, timedelta
25from saml.utils import SAMLDateTime
26from saml.xml.etree import AssertionElementTree
27
28from ndg.security.test.unit import BaseTestCase
29
30from ndg.security.common.utils.configfileparsers import (
31                                                    CaseSensitiveConfigParser)
32from ndg.security.common.utils.etree import prettyPrint
33from ndg.security.common.X509 import X509CertParse
34from ndg.security.common.credentialwallet import (NDGCredentialWallet, 
35    CredentialWalletAttributeRequestDenied, SAMLCredentialWallet)
36from ndg.security.server.attributeauthority import AttributeAuthority
37
38from os.path import expandvars as xpdVars
39from os.path import join as jnPath
40mkPath = lambda file: jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'], file)
41
42import logging
43logging.basicConfig(level=logging.DEBUG)
44
45
46class NDGCredentialWalletTestCase(BaseTestCase):
47    """Unit test case for
48    ndg.security.common.credentialwallet.NDGCredentialWallet class.
49    """
50    THIS_DIR = os.path.dirname(__file__)
51    PICKLE_FILENAME = 'NDGCredentialWalletPickle.dat'
52    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
53
54    def __init__(self, *arg, **kw):
55        super(NDGCredentialWalletTestCase, self).__init__(*arg, **kw)
56        self.startAttributeAuthorities()
57   
58    def setUp(self):
59        super(NDGCredentialWalletTestCase, self).setUp()
60       
61        if 'NDGSEC_INT_DEBUG' in os.environ:
62            import pdb
63            pdb.set_trace()
64       
65        if 'NDGSEC_CREDWALLET_UNITTEST_DIR' not in os.environ:
66            os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'] = \
67                os.path.abspath(os.path.dirname(__file__))
68       
69        self.cfg = CaseSensitiveConfigParser()
70        configFilePath = jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'],
71                                "credWalletTest.cfg")
72        self.cfg.read(configFilePath)
73
74        self.userX509CertFilePath=self.cfg.get('setUp', 'userX509CertFilePath')
75        self.userPriKeyFilePath=self.cfg.get('setUp', 'userPriKeyFilePath')
76       
77
78    def test01ReadOnlyClassVariables(self):
79       
80        try:
81            NDGCredentialWallet.accessDenied = 'yes'
82            self.fail("accessDenied class variable should be read-only")
83        except Exception, e:
84            print("PASS - accessDenied class variable is read-only")
85
86        try:
87            NDGCredentialWallet.accessGranted = False
88            self.fail("accessGranted class variable should be read-only")
89        except Exception, e:
90            print("PASS - accessGranted class variable is read-only")
91           
92        assert(not NDGCredentialWallet.accessDenied)
93        assert(NDGCredentialWallet.accessGranted)
94       
95       
96    def test02SetAttributes(self):
97       
98        credWallet = NDGCredentialWallet()
99        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
100        print("userX509Cert=%s" % credWallet.userX509Cert)
101        credWallet.userId = 'ndg-user'
102        print("userId=%s" % credWallet.userId)
103       
104        try:
105            credWallet.blah = 'blah blah'
106            self.fail("Attempting to set attribute not in __slots__ class "
107                      "variable should fail")
108        except AttributeError:
109            print("PASS - expected AttributeError when setting attribute "
110                  "not in __slots__ class variable")
111           
112        credWallet.caCertFilePathList=None
113        credWallet.attributeAuthorityURI='http://localhost/AttributeAuthority'
114           
115        credWallet.attributeAuthority = None
116        credWallet._credentialRepository = None
117        credWallet.mapFromTrustedHosts = False
118        credWallet.rtnExtAttCertList = True
119        credWallet.attCertRefreshElapse = 7200
120     
121           
122    def test03GetAttCertWithUserId(self):
123                   
124        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
125                                                          'cfgFilePath'))
126        attCert = credWallet.getAttCert()
127       
128        # No user X.509 cert is set so the resulting Attribute Certificate
129        # user ID should be the same as that set for the wallet
130        assert(attCert.userId == credWallet.userId)
131        print("Attribute Certificate:\n%s" % attCert)
132       
133    def test04GetAttCertWithUserX509Cert(self):
134                   
135        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
136                                                          'cfgFilePath'))
137       
138        # Set a test individual user certificate to override the client
139        # cert. and private key in WS-Security settings in the config file
140        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
141        credWallet.userPriKey=open(xpdVars(self.userPriKeyFilePath)).read()
142        attCert = credWallet.getAttCert()
143       
144        # A user X.509 cert. was set so this cert's DN should be set in the
145        # userId field of the resulting Attribute Certificate
146        assert(attCert.userId == str(credWallet.userX509Cert.dn))
147        print("Attribute Certificate:\n%s" % attCert)
148
149    def test05GetAttCertRefusedWithUserX509Cert(self):
150       
151        # Keyword mapFromTrustedHosts overrides any setting in the config file
152        # This flag prevents role mapping from a trusted AA and so in this case
153        # forces refusal of the request
154        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
155                                                          'cfgFilePath'),
156                                         mapFromTrustedHosts=False)   
157        credWallet.userX509CertFilePath = self.userX509CertFilePath
158        credWallet.userPriKeyFilePath = self.userPriKeyFilePath
159       
160        # Set AA URI AFTER user PKI settings so that these are picked in the
161        # implicit call to create a new AA Client when the URI is set
162        credWallet.attributeAuthorityURI = self.cfg.get('setUp', 
163                                                        'attributeAuthorityURI')
164        try:
165            attCert = credWallet.getAttCert()
166        except CredentialWalletAttributeRequestDenied, e:
167            print("ok - obtained expected result: %s" % e)
168            return
169       
170        self.fail("Request allowed from Attribute Authority where user is NOT "
171                  "registered!")
172
173    def test06GetMappedAttCertWithUserId(self):
174       
175        # Call Site A Attribute Authority where user is registered
176        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
177                                                          'cfgFilePath'))
178        attCert = credWallet.getAttCert()
179
180        # Use Attribute Certificate cached in wallet to get a mapped
181        # Attribute Certificate from Site B's Attribute Authority
182        siteBURI = self.cfg.get('setUp', 'attributeAuthorityURI')       
183        attCert = credWallet.getAttCert(attributeAuthorityURI=siteBURI)
184           
185        print("Mapped Attribute Certificate from Site B Attribute "
186              "Authority:\n%s" % attCert)
187                       
188    def test07GetAttCertFromLocalAAInstance(self):
189        thisSection = 'test07GetAttCertFromLocalAAInstance'
190        aaPropFilePath = self.cfg.get(thisSection,
191                                      'attributeAuthorityPropFilePath') 
192                 
193        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
194                                                          'cfgFilePath'))
195        credWallet.attributeAuthority = AttributeAuthority.fromPropertyFile(
196                                            propFilePath=aaPropFilePath)
197        attCert = credWallet.getAttCert()
198       
199        # No user X.509 cert is set so the resulting Attribute Certificate
200        # user ID should be the same as that set for the wallet
201        assert(attCert.userId == credWallet.userId)
202        print("Attribute Certificate:\n%s" % attCert) 
203
204    def test08Pickle(self):
205        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
206                                                          'cfgFilePath'))
207
208        outFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
209        pickle.dump(credWallet, outFile)
210        outFile.close()
211       
212        inFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH)
213        unpickledCredWallet = pickle.load(inFile)
214        self.assert_(unpickledCredWallet.userId == credWallet.userId)
215       
216
217class SAMLCredentialWalletTestCase(BaseTestCase):
218    THIS_DIR = os.path.dirname(__file__)
219    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
220    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
221    PICKLE_FILENAME = 'SAMLCredentialWalletPickle.dat'
222    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
223   
224    ASSERTION_STR = (
225"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
226   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
227   <saml:Subject>
228      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
229   </saml:Subject>
230   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
231   <saml:AttributeStatement>
232      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
233         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
234      </saml:Attribute>
235      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
236         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
237      </saml:Attribute>
238      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
239         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
240      </saml:Attribute>
241   </saml:AttributeStatement>
242</saml:Assertion>
243"""
244    )
245   
246    def __init__(self, *arg, **kw):
247        super(SAMLCredentialWalletTestCase, self).__init__(*arg, **kw)
248       
249    def setUp(self):
250        self.assertion =self._createAssertion()
251       
252    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
253                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
254        if timeNow is None:
255            timeNow = datetime.utcnow()
256           
257        timeExpires = timeNow + timedelta(seconds=validityDuration)
258        assertionStr = Template(
259            SAMLCredentialWalletTestCase.ASSERTION_STR).substitute(
260                dict(
261                 issuerName=issuerName,
262                 timeNow=SAMLDateTime.toString(timeNow), 
263                 timeExpires=SAMLDateTime.toString(timeExpires)
264                )
265            )
266
267        assertionStream = StringIO()
268        assertionStream.write(assertionStr)
269        assertionStream.seek(0)
270        assertionElem = ElementTree.parse(assertionStream).getroot()
271        return AssertionElementTree.fromXML(assertionElem)
272
273    def _addCredential(self):
274        wallet = SAMLCredentialWallet()   
275        wallet.addCredential(
276            self.assertion, 
277            attributeAuthorityURI=\
278                SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
279        return wallet
280   
281    def test01AddCredential(self):
282        wallet = self._addCredential()
283       
284        self.assert_(len(wallet.credentials) == 1)
285        self.assert_(
286            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \
287            wallet.credentialsKeyedByURI)
288        self.assert_(SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME in \
289                     wallet.credentials)
290       
291        assertion = wallet.credentials[
292            SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
293        ].credential
294       
295        print("SAML Assertion:\n%s" % 
296              prettyPrint(AssertionElementTree.toXML(assertion)))
297   
298    def test02VerifyCredential(self):
299        wallet = SAMLCredentialWallet()
300        self.assert_(wallet.isValidCredential(self.assertion))
301       
302        expiredAssertion = self._createAssertion(
303                                timeNow=datetime.utcnow() - timedelta(hours=24))
304                               
305        self.assert_(not wallet.isValidCredential(expiredAssertion))
306       
307        futureAssertion = self._createAssertion(
308                                timeNow=datetime.utcnow() + timedelta(hours=24))
309
310        self.assert_(not wallet.isValidCredential(futureAssertion))
311       
312    def test03AuditCredential(self):
313        # Add a short lived credential and ensure it's removed when an audit
314        # is carried to prune expired credentials
315        shortExpiryAssertion = self._createAssertion(validityDuration=1)
316        wallet = SAMLCredentialWallet()
317        wallet.addCredential(shortExpiryAssertion)
318       
319        self.assert_(len(wallet.credentials) == 1)
320        sleep(2)
321        wallet.audit()
322        self.assert_(len(wallet.credentials) == 0)
323
324    def test04ReplaceCredential(self):
325        # Replace an existing credential from a given institution with a more
326        # up to date one
327        wallet = self._addCredential()
328        self.assert_(len(wallet.credentials) == 1)
329       
330        newAssertion = self._createAssertion() 
331
332        wallet.addCredential(newAssertion)
333        self.assert_(len(wallet.credentials) == 1)
334        self.assert_(newAssertion.conditions.notOnOrAfter==\
335                     wallet.credentials[
336                        SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
337                    ].credential.conditions.notOnOrAfter)
338       
339    def test05CredentialsFromSeparateSites(self):
340        wallet = self._addCredential()
341        wallet.addCredential(self._createAssertion(issuerName="MySite"))
342        self.assert_(len(wallet.credentials) == 2)
343
344    def test06Pickle(self):
345        wallet = self._addCredential()
346        outFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
347        pickle.dump(wallet, outFile)
348        outFile.close()
349       
350        inFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH)
351        unpickledWallet = pickle.load(inFile)
352        self.assert_(unpickledWallet.credentialsKeyedByURI.get(
353            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI))
354       
355       
356if __name__ == "__main__":
357    unittest.main()       
Note: See TracBrowser for help on using the repository browser.