source: TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 6062

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@6062
Revision 6062, 13.6 KB checked in by pjkersha, 10 years ago (diff)
  • Working SamlPIPMiddleware - the Policy Information Point with an interface to the SAML Attribute Authority. The PIP retrieves user credential information for the PDP.
  • Fixed ndg.security.test.unit.wsgi.authz.test_authz unit tests for the above.
Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet classes
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12
13import unittest
14import os, sys, getpass, re
15import traceback
16
17from string import Template
18from cStringIO import StringIO
19from elementtree import ElementTree
20
21from time import sleep
22from datetime import datetime, timedelta
23from saml.utils import SAMLDateTime
24from saml.xml.etree import AssertionElementTree
25
26from ndg.security.test.unit import BaseTestCase
27
28from ndg.security.common.utils.configfileparsers import (
29                                                    CaseSensitiveConfigParser)
30from ndg.security.common.utils.etree import prettyPrint
31from ndg.security.common.X509 import X509CertParse
32from ndg.security.common.credentialwallet import (NDGCredentialWallet, 
33    CredentialWalletAttributeRequestDenied, SAMLCredentialWallet)
34from ndg.security.server.attributeauthority import AttributeAuthority
35
36from os.path import expandvars as xpdVars
37from os.path import join as jnPath
38mkPath = lambda file: jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'], file)
39
40import logging
41logging.basicConfig(level=logging.DEBUG)
42
43
44class NDGCredentialWalletTestCase(BaseTestCase):
45    """Unit test case for
46    ndg.security.common.credentialwallet.NDGCredentialWallet class.
47    """
48    def __init__(self, *arg, **kw):
49        super(NDGCredentialWalletTestCase, self).__init__(*arg, **kw)
50        self.startAttributeAuthorities()
51   
52    def setUp(self):
53        super(NDGCredentialWalletTestCase, self).setUp()
54       
55        if 'NDGSEC_INT_DEBUG' in os.environ:
56            import pdb
57            pdb.set_trace()
58       
59        if 'NDGSEC_CREDWALLET_UNITTEST_DIR' not in os.environ:
60            os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'] = \
61                os.path.abspath(os.path.dirname(__file__))
62       
63        self.cfg = CaseSensitiveConfigParser()
64        configFilePath = jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'],
65                                "credWalletTest.cfg")
66        self.cfg.read(configFilePath)
67
68        self.userX509CertFilePath=self.cfg.get('setUp', 'userX509CertFilePath')
69        self.userPriKeyFilePath=self.cfg.get('setUp', 'userPriKeyFilePath')
70       
71
72    def test01ReadOnlyClassVariables(self):
73       
74        try:
75            NDGCredentialWallet.accessDenied = 'yes'
76            self.fail("accessDenied class variable should be read-only")
77        except Exception, e:
78            print("PASS - accessDenied class variable is read-only")
79
80        try:
81            NDGCredentialWallet.accessGranted = False
82            self.fail("accessGranted class variable should be read-only")
83        except Exception, e:
84            print("PASS - accessGranted class variable is read-only")
85           
86        assert(not NDGCredentialWallet.accessDenied)
87        assert(NDGCredentialWallet.accessGranted)
88       
89       
90    def test02SetAttributes(self):
91       
92        credWallet = NDGCredentialWallet()
93        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
94        print("userX509Cert=%s" % credWallet.userX509Cert)
95        credWallet.userId = 'ndg-user'
96        print("userId=%s" % credWallet.userId)
97       
98        try:
99            credWallet.blah = 'blah blah'
100            self.fail("Attempting to set attribute not in __slots__ class "
101                      "variable should fail")
102        except AttributeError:
103            print("PASS - expected AttributeError when setting attribute "
104                  "not in __slots__ class variable")
105           
106        credWallet.caCertFilePathList=None
107        credWallet.attributeAuthorityURI='http://localhost/AttributeAuthority'
108           
109        credWallet.attributeAuthority = None
110        credWallet.credentialRepository = None
111        credWallet.mapFromTrustedHosts = False
112        credWallet.rtnExtAttCertList = True
113        credWallet.attCertRefreshElapse = 7200
114     
115           
116    def test03GetAttCertWithUserId(self):
117                   
118        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
119                                                          'cfgFilePath'))
120        attCert = credWallet.getAttCert()
121       
122        # No user X.509 cert is set so the resulting Attribute Certificate
123        # user ID should be the same as that set for the wallet
124        assert(attCert.userId == credWallet.userId)
125        print("Attribute Certificate:\n%s" % attCert)
126       
127    def test04GetAttCertWithUserX509Cert(self):
128                   
129        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
130                                                          'cfgFilePath'))
131       
132        # Set a test individual user certificate to override the client
133        # cert. and private key in WS-Security settings in the config file
134        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
135        credWallet.userPriKey=open(xpdVars(self.userPriKeyFilePath)).read()
136        attCert = credWallet.getAttCert()
137       
138        # A user X.509 cert. was set so this cert's DN should be set in the
139        # userId field of the resulting Attribute Certificate
140        assert(attCert.userId == str(credWallet.userX509Cert.dn))
141        print("Attribute Certificate:\n%s" % attCert)
142
143    def test05GetAttCertRefusedWithUserX509Cert(self):
144       
145        # Keyword mapFromTrustedHosts overrides any setting in the config file
146        # This flag prevents role mapping from a trusted AA and so in this case
147        # forces refusal of the request
148        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
149                                                          'cfgFilePath'),
150                                         mapFromTrustedHosts=False)   
151        credWallet.userX509CertFilePath = self.userX509CertFilePath
152        credWallet.userPriKeyFilePath = self.userPriKeyFilePath
153       
154        # Set AA URI AFTER user PKI settings so that these are picked in the
155        # implicit call to create a new AA Client when the URI is set
156        credWallet.attributeAuthorityURI = self.cfg.get('setUp', 
157                                                        'attributeAuthorityURI')
158        try:
159            attCert = credWallet.getAttCert()
160        except CredentialWalletAttributeRequestDenied, e:
161            print("SUCCESS - obtained expected result: %s" % e)
162            return
163       
164        self.fail("Request allowed from Attribute Authority where user is NOT "
165                  "registered!")
166
167    def test06GetMappedAttCertWithUserId(self):
168       
169        # Call Site A Attribute Authority where user is registered
170        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
171                                                          'cfgFilePath'))
172        attCert = credWallet.getAttCert()
173
174        # Use Attribute Certificate cached in wallet to get a mapped
175        # Attribute Certificate from Site B's Attribute Authority
176        siteBURI = self.cfg.get('setUp', 'attributeAuthorityURI')       
177        attCert = credWallet.getAttCert(attributeAuthorityURI=siteBURI)
178           
179        print("Mapped Attribute Certificate from Site B Attribute "
180              "Authority:\n%s" % attCert)
181                       
182    def test07GetAttCertFromLocalAAInstance(self):
183        thisSection = 'test07GetAttCertFromLocalAAInstance'
184        aaPropFilePath = self.cfg.get(thisSection,
185                                      'attributeAuthorityPropFilePath') 
186                 
187        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
188                                                          'cfgFilePath'))
189        credWallet.attributeAuthority = AttributeAuthority.fromPropertyFile(
190                                            propFilePath=aaPropFilePath)
191        attCert = credWallet.getAttCert()
192       
193        # No user X.509 cert is set so the resulting Attribute Certificate
194        # user ID should be the same as that set for the wallet
195        assert(attCert.userId == credWallet.userId)
196        print("Attribute Certificate:\n%s" % attCert) 
197
198
199class SAMLCredentialWalletTestCase(BaseTestCase):
200    THIS_DIR = os.path.dirname(__file__)
201    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
202    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
203   
204    ASSERTION_STR = (
205"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
206   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
207   <saml:Subject>
208      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
209   </saml:Subject>
210   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
211   <saml:AttributeStatement>
212      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
213         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
214      </saml:Attribute>
215      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
216         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
217      </saml:Attribute>
218      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
219         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
220      </saml:Attribute>
221   </saml:AttributeStatement>
222</saml:Assertion>
223"""
224    )
225   
226    def __init__(self, *arg, **kw):
227        super(SAMLCredentialWalletTestCase, self).__init__(*arg, **kw)
228       
229    def setUp(self):
230        self.assertion =self._createAssertion()
231       
232    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
233                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
234        if timeNow is None:
235            timeNow = datetime.utcnow()
236           
237        timeExpires = timeNow + timedelta(seconds=validityDuration)
238        assertionStr = Template(
239            SAMLCredentialWalletTestCase.ASSERTION_STR).substitute(
240                dict(
241                 issuerName=issuerName,
242                 timeNow=SAMLDateTime.toString(timeNow), 
243                 timeExpires=SAMLDateTime.toString(timeExpires)
244                )
245            )
246
247        assertionStream = StringIO()
248        assertionStream.write(assertionStr)
249        assertionStream.seek(0)
250        assertionElem = ElementTree.parse(assertionStream).getroot()
251        return AssertionElementTree.fromXML(assertionElem)
252
253    def _addCredential(self):
254        wallet = SAMLCredentialWallet()   
255        wallet.addCredential(
256            self.assertion, 
257            attributeAuthorityURI=\
258                SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
259        return wallet
260   
261    def test01AddCredential(self):
262        wallet = self._addCredential()
263       
264        self.assert_(len(wallet.credentials) == 1)
265        self.assert_(
266            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \
267            wallet.credentialsKeyedByURI)
268        self.assert_(SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME in \
269                     wallet.credentials)
270       
271        assertion = wallet.credentials[
272            SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
273        ].credential
274       
275        print("SAML Assertion:\n%s" % 
276              prettyPrint(AssertionElementTree.toXML(assertion)))
277   
278    def test02VerifyCredential(self):
279        wallet = SAMLCredentialWallet()
280        self.assert_(wallet.isValidCredential(self.assertion))
281       
282        expiredAssertion = self._createAssertion(
283                                timeNow=datetime.utcnow() - timedelta(hours=24))
284                               
285        self.assert_(not wallet.isValidCredential(expiredAssertion))
286       
287        futureAssertion = self._createAssertion(
288                                timeNow=datetime.utcnow() + timedelta(hours=24))
289
290        self.assert_(not wallet.isValidCredential(futureAssertion))
291       
292    def test03AuditCredential(self):
293        # Add a short lived credential and ensure it's removed when an audit
294        # is carried to prune expired credentials
295        shortExpiryAssertion = self._createAssertion(validityDuration=1)
296        wallet = SAMLCredentialWallet()
297        wallet.addCredential(shortExpiryAssertion)
298       
299        self.assert_(len(wallet.credentials) == 1)
300        sleep(2)
301        wallet.audit()
302        self.assert_(len(wallet.credentials) == 0)
303
304    def test04ReplaceCredential(self):
305        # Replace an existing credential from a given institution with a more
306        # up to date one
307        wallet = self._addCredential()
308        self.assert_(len(wallet.credentials) == 1)
309       
310        newAssertion = self._createAssertion() 
311
312        wallet.addCredential(newAssertion)
313        self.assert_(len(wallet.credentials) == 1)
314        self.assert_(newAssertion.conditions.notOnOrAfter==\
315                     wallet.credentials[
316                        SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
317                    ].credential.conditions.notOnOrAfter)
318       
319    def test05CredentialsFromSeparateSites(self):
320        wallet = self._addCredential()
321        wallet.addCredential(self._createAssertion(issuerName="MySite"))
322        self.assert_(len(wallet.credentials) == 2)
323       
324       
325if __name__ == "__main__":
326    unittest.main()       
Note: See TracBrowser for help on using the repository browser.