source: TI12-security/branches/ndg-security-1.5.x/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 6871

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/branches/ndg-security-1.5.x/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@6871
Revision 6871, 15.8 KB checked in by pjkersha, 9 years ago (diff)

1.5.4 Release

Includes Pylons login/logout decorators.

Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet classes
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os, sys, getpass, re
15import traceback
16
17from string import Template
18from cStringIO import StringIO
19import cPickle as pickle
20
21from elementtree import ElementTree
22
23from time import sleep
24from datetime import datetime, timedelta
25from saml.utils import SAMLDateTime
26from saml.xml.etree import AssertionElementTree
27
28from ndg.security.test.unit import BaseTestCase
29
30from ndg.security.common.utils.configfileparsers import (
31                                                    CaseSensitiveConfigParser)
32from ndg.security.common.utils.etree import prettyPrint
33from ndg.security.common.X509 import X509CertParse
34from ndg.security.common.credentialwallet import (NDGCredentialWallet, 
35    CredentialWalletAttributeRequestDenied, SAMLCredentialWallet)
36from ndg.security.server.attributeauthority import AttributeAuthority
37
38from os.path import expandvars as xpdVars
39from os.path import join as jnPath
40mkPath = lambda file: jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'], file)
41
42import logging
43logging.basicConfig(level=logging.DEBUG)
44
45
46class NDGCredentialWalletTestCase(BaseTestCase):
47    """Unit test case for
48    ndg.security.common.credentialwallet.NDGCredentialWallet class.
49    """
50    THIS_DIR = os.path.dirname(__file__)
51    PICKLE_FILENAME = 'NDGCredentialWalletPickle.dat'
52    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
53
54    def __init__(self, *arg, **kw):
55        super(NDGCredentialWalletTestCase, self).__init__(*arg, **kw)
56        self.startAttributeAuthorities()
57   
58    def setUp(self):
59        super(NDGCredentialWalletTestCase, self).setUp()
60       
61        if 'NDGSEC_INT_DEBUG' in os.environ:
62            import pdb
63            pdb.set_trace()
64       
65        if 'NDGSEC_CREDWALLET_UNITTEST_DIR' not in os.environ:
66            os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'] = \
67                os.path.abspath(os.path.dirname(__file__))
68       
69        self.cfg = CaseSensitiveConfigParser()
70        configFilePath = jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'],
71                                "credWalletTest.cfg")
72        self.cfg.read(configFilePath)
73
74        self.userX509CertFilePath=self.cfg.get('setUp', 'userX509CertFilePath')
75        self.userPriKeyFilePath=self.cfg.get('setUp', 'userPriKeyFilePath')
76       
77    def test01ReadOnlyClassVariables(self):
78       
79        try:
80            NDGCredentialWallet.accessDenied = 'yes'
81            self.fail("accessDenied class variable should be read-only")
82        except Exception, e:
83            print("PASS - accessDenied class variable is read-only")
84
85        try:
86            NDGCredentialWallet.accessGranted = False
87            self.fail("accessGranted class variable should be read-only")
88        except Exception, e:
89            print("PASS - accessGranted class variable is read-only")
90           
91        assert(not NDGCredentialWallet.accessDenied)
92        assert(NDGCredentialWallet.accessGranted)
93       
94       
95    def test02SetAttributes(self):
96       
97        credWallet = NDGCredentialWallet()
98        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
99        print("userX509Cert=%s" % credWallet.userX509Cert)
100        credWallet.userId = 'ndg-user'
101        print("userId=%s" % credWallet.userId)
102       
103        try:
104            credWallet.blah = 'blah blah'
105            self.fail("Attempting to set attribute not in __slots__ class "
106                      "variable should fail")
107        except AttributeError:
108            print("PASS - expected AttributeError when setting attribute "
109                  "not in __slots__ class variable")
110           
111        credWallet.caCertFilePathList=None
112        credWallet.attributeAuthorityURI='http://localhost/AttributeAuthority'
113           
114        credWallet.attributeAuthority = None
115        credWallet._credentialRepository = None
116        credWallet.mapFromTrustedHosts = False
117        credWallet.rtnExtAttCertList = True
118        credWallet.attCertRefreshElapse = 7200
119     
120           
121    def test03GetAttCertWithUserId(self):
122                   
123        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
124                                                          'cfgFilePath'))
125        attCert = credWallet.getAttCert()
126       
127        # No user X.509 cert is set so the resulting Attribute Certificate
128        # user ID should be the same as that set for the wallet
129        assert(attCert.userId == credWallet.userId)
130        print("Attribute Certificate:\n%s" % attCert)
131       
132    def test04GetAttCertWithUserX509Cert(self):
133                   
134        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
135                                                          'cfgFilePath'))
136       
137        # Set a test individual user certificate to override the client
138        # cert. and private key in WS-Security settings in the config file
139        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
140        credWallet.userPriKey=open(xpdVars(self.userPriKeyFilePath)).read()
141        attCert = credWallet.getAttCert()
142       
143        # A user X.509 cert. was set so this cert's DN should be set in the
144        # userId field of the resulting Attribute Certificate
145        assert(attCert.userId == str(credWallet.userX509Cert.dn))
146        print("Attribute Certificate:\n%s" % attCert)
147
148    def test05GetAttCertRefusedWithUserX509Cert(self):
149       
150        # Keyword mapFromTrustedHosts overrides any setting in the config file
151        # This flag prevents role mapping from a trusted AA and so in this case
152        # forces refusal of the request
153        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
154                                                          'cfgFilePath'),
155                                         mapFromTrustedHosts=False)   
156        credWallet.userX509CertFilePath = self.userX509CertFilePath
157        credWallet.userPriKeyFilePath = self.userPriKeyFilePath
158       
159        # Set AA URI AFTER user PKI settings so that these are picked in the
160        # implicit call to create a new AA Client when the URI is set
161        credWallet.attributeAuthorityURI = self.cfg.get('setUp', 
162                                                        'attributeAuthorityURI')
163        try:
164            attCert = credWallet.getAttCert()
165        except CredentialWalletAttributeRequestDenied, e:
166            print("ok - obtained expected result: %s" % e)
167            return
168       
169        self.fail("Request allowed from Attribute Authority where user is NOT "
170                  "registered!")
171
172    def test06GetMappedAttCertWithUserId(self):
173       
174        # Call Site A Attribute Authority where user is registered
175        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
176                                                          'cfgFilePath'))
177        attCert = credWallet.getAttCert()
178
179        # Use Attribute Certificate cached in wallet to get a mapped
180        # Attribute Certificate from Site B's Attribute Authority
181        siteBURI = self.cfg.get('setUp', 'attributeAuthorityURI')       
182        attCert = credWallet.getAttCert(attributeAuthorityURI=siteBURI)
183           
184        print("Mapped Attribute Certificate from Site B Attribute "
185              "Authority:\n%s" % attCert)
186                       
187    def test07GetAttCertFromLocalAAInstance(self):
188        thisSection = 'test07GetAttCertFromLocalAAInstance'
189        aaPropFilePath = self.cfg.get(thisSection,
190                                      'attributeAuthorityPropFilePath') 
191                 
192        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
193                                                          'cfgFilePath'))
194        credWallet.attributeAuthority = AttributeAuthority.fromPropertyFile(
195                                            propFilePath=aaPropFilePath)
196        attCert = credWallet.getAttCert()
197       
198        # No user X.509 cert is set so the resulting Attribute Certificate
199        # user ID should be the same as that set for the wallet
200        assert(attCert.userId == credWallet.userId)
201
202    def test08Pickle(self):
203        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
204                                                          'cfgFilePath'))
205
206        outFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
207        pickle.dump(credWallet, outFile)
208        outFile.close()
209       
210        inFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH)
211        unpickledCredWallet = pickle.load(inFile)
212        self.assert_(unpickledCredWallet.userId == credWallet.userId)
213       
214
215class SAMLCredentialWalletTestCase(BaseTestCase):
216    THIS_DIR = os.path.dirname(__file__)
217    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
218    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
219    PICKLE_FILENAME = 'SAMLCredentialWalletPickle.dat'
220    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
221   
222    ASSERTION_STR = (
223"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
224   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
225   <saml:Subject>
226      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
227   </saml:Subject>
228   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
229   <saml:AttributeStatement>
230      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
231         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
232      </saml:Attribute>
233      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
234         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
235      </saml:Attribute>
236      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
237         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
238      </saml:Attribute>
239   </saml:AttributeStatement>
240</saml:Assertion>
241"""
242    )
243   
244    def __init__(self, *arg, **kw):
245        super(SAMLCredentialWalletTestCase, self).__init__(*arg, **kw)
246       
247    def setUp(self):
248        self.assertion = self._createAssertion()
249       
250    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
251                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
252        if timeNow is None:
253            timeNow = datetime.utcnow()
254           
255        timeExpires = timeNow + timedelta(seconds=validityDuration)
256        assertionStr = Template(
257            SAMLCredentialWalletTestCase.ASSERTION_STR).substitute(
258                dict(
259                 issuerName=issuerName,
260                 timeNow=SAMLDateTime.toString(timeNow), 
261                 timeExpires=SAMLDateTime.toString(timeExpires)
262                )
263            )
264
265        assertionStream = StringIO()
266        assertionStream.write(assertionStr)
267        assertionStream.seek(0)
268        assertionElem = ElementTree.parse(assertionStream).getroot()
269        return AssertionElementTree.fromXML(assertionElem)
270
271    def _addCredential(self):
272        wallet = SAMLCredentialWallet()   
273        wallet.addCredential(
274            self.assertion, 
275            attributeAuthorityURI=\
276                SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
277        return wallet
278   
279    def test01AddCredential(self):
280        wallet = self._addCredential()
281       
282        self.assert_(len(wallet.credentials) == 1)
283        self.assert_(
284            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \
285            wallet.credentialsKeyedByURI)
286        self.assert_(SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME in \
287                     wallet.credentials)
288       
289        assertion = wallet.credentials[
290            SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
291        ].credential
292       
293        print("SAML Assertion:\n%s" % 
294              prettyPrint(AssertionElementTree.toXML(assertion)))
295   
296    def test02VerifyCredential(self):
297        wallet = SAMLCredentialWallet()
298        self.assert_(wallet.isValidCredential(self.assertion))
299       
300        expiredAssertion = self._createAssertion(
301                                timeNow=datetime.utcnow() - timedelta(hours=24))
302                               
303        self.assert_(not wallet.isValidCredential(expiredAssertion))
304       
305        futureAssertion = self._createAssertion(
306                                timeNow=datetime.utcnow() + timedelta(hours=24))
307
308        self.assert_(not wallet.isValidCredential(futureAssertion))
309       
310    def test03AuditCredential(self):
311        # Add a short lived credential and ensure it's removed when an audit
312        # is carried to prune expired credentials
313        shortExpiryAssertion = self._createAssertion(validityDuration=1)
314        wallet = SAMLCredentialWallet()
315        wallet.addCredential(shortExpiryAssertion)
316       
317        self.assert_(len(wallet.credentials) == 1)
318        sleep(2)
319        wallet.audit()
320        self.assert_(len(wallet.credentials) == 0)
321
322    def test04ClockSkewTolerance(self):
323        # Add a short lived credential but with the wallet set to allow for
324        # a clock skew of
325        shortExpiryAssertion = self._createAssertion(validityDuration=1)
326        wallet = SAMLCredentialWallet()
327       
328        # Set a tolerance of five seconds
329        wallet.clockSkewTolerance = 5.*60*60
330        wallet.addCredential(shortExpiryAssertion)
331       
332        self.assert_(len(wallet.credentials) == 1)
333        sleep(2)
334        wallet.audit()
335        self.assert_(len(wallet.credentials) == 1)
336       
337    def test05ReplaceCredential(self):
338        # Replace an existing credential from a given institution with a more
339        # up to date one
340        wallet = self._addCredential()
341        self.assert_(len(wallet.credentials) == 1)
342       
343        newAssertion = self._createAssertion() 
344
345        wallet.addCredential(newAssertion)
346        self.assert_(len(wallet.credentials) == 1)
347        self.assert_(newAssertion.conditions.notOnOrAfter == \
348                     wallet.credentials[
349                        SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
350                    ].credential.conditions.notOnOrAfter)
351       
352    def test06CredentialsFromSeparateSites(self):
353        wallet = self._addCredential()
354        wallet.addCredential(self._createAssertion(issuerName="MySite"))
355        self.assert_(len(wallet.credentials) == 2)
356
357    def test07Pickle(self):
358        wallet = self._addCredential()
359        outFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
360        pickle.dump(wallet, outFile)
361        outFile.close()
362       
363        inFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH)
364        unpickledWallet = pickle.load(inFile)
365        self.assert_(unpickledWallet.credentialsKeyedByURI.get(
366            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI))
367       
368        self.assert_(unpickledWallet.credentials.items()[0][1].issuerName == \
369                     BaseTestCase.SITEA_SAML_ISSUER_NAME)
370
371    def test08CreateFromConfig(self):
372        wallet = SAMLCredentialWallet.fromConfig(
373                                SAMLCredentialWalletTestCase.CONFIG_FILEPATH)
374        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
375        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
376       
377if __name__ == "__main__":
378    unittest.main()       
Note: See TracBrowser for help on using the repository browser.