source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 7356

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@7356
Revision 7356, 13.5 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • implemented caching of authorisation decision statements in the PEP to cut down on calls to authorisation service.
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet class
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17
18from string import Template
19from cStringIO import StringIO
20import cPickle as pickle
21
22from elementtree import ElementTree
23
24from time import sleep
25from datetime import datetime, timedelta
26
27from ndg.saml.utils import SAMLDateTime
28from ndg.saml.xml.etree import AssertionElementTree
29
30from ndg.security.test.unit import BaseTestCase
31from ndg.security.common.utils.etree import prettyPrint
32from ndg.security.common.credentialwallet import (SAMLAttributeWallet,
33                                                  SAMLAuthzDecisionWallet)
34
35
36class CredentialWalletBaseTestCase(BaseTestCase):
37    THIS_DIR = os.path.dirname(__file__)
38    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
39    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
40   
41   
42class SAMLAttributeWalletTestCase(CredentialWalletBaseTestCase):
43    PICKLE_FILENAME = 'SAMLAttributeWalletPickle.dat'
44    PICKLE_FILEPATH = os.path.join(CredentialWalletBaseTestCase.THIS_DIR, 
45                                   PICKLE_FILENAME)
46   
47    ASSERTION_STR = (
48"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
49   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
50   <saml:Subject>
51      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
52   </saml:Subject>
53   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
54   <saml:AttributeStatement>
55      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
56         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
57      </saml:Attribute>
58      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
59         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
60      </saml:Attribute>
61      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
62         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
63      </saml:Attribute>
64   </saml:AttributeStatement>
65</saml:Assertion>
66"""
67    )
68   
69    def __init__(self, *arg, **kw):
70        super(SAMLAttributeWalletTestCase, self).__init__(*arg, **kw)
71       
72    def setUp(self):
73        self.assertion = self._createAssertion()
74       
75    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
76                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
77        if timeNow is None:
78            timeNow = datetime.utcnow()
79           
80        timeExpires = timeNow + timedelta(seconds=validityDuration)
81        assertionStr = Template(
82            self.__class__.ASSERTION_STR).substitute(
83                dict(
84                 issuerName=issuerName,
85                 timeNow=SAMLDateTime.toString(timeNow), 
86                 timeExpires=SAMLDateTime.toString(timeExpires)
87                )
88            )
89
90        assertionStream = StringIO()
91        assertionStream.write(assertionStr)
92        assertionStream.seek(0)
93        assertionElem = ElementTree.parse(assertionStream).getroot()
94        return AssertionElementTree.fromXML(assertionElem)
95
96    def _addCredential(self):
97        wallet = SAMLAttributeWallet()   
98        wallet.addCredential(self.assertion, issuerEndpoint=\
99        self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
100        return wallet
101   
102    def test01AddCredential(self):
103        wallet = self._addCredential()
104       
105        self.assert_(len(wallet.credentials) == 1)
106        self.assert_(self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \
107                     wallet.credentialsKeyedByURI)
108        self.assert_(self.__class__.SITEA_SAML_ISSUER_NAME in \
109                     wallet.credentials)
110       
111        assertion = wallet.credentials[
112            self.__class__.SITEA_SAML_ISSUER_NAME
113        ].credential
114       
115        print("SAML Assertion:\n%s" % 
116              prettyPrint(AssertionElementTree.toXML(assertion)))
117   
118    def test02VerifyCredential(self):
119        wallet = SAMLAttributeWallet()
120        self.assert_(wallet.isValidCredential(self.assertion))
121       
122        expiredAssertion = self._createAssertion(
123                                timeNow=datetime.utcnow() - timedelta(hours=24))
124                               
125        self.assert_(not wallet.isValidCredential(expiredAssertion))
126       
127        futureAssertion = self._createAssertion(
128                                timeNow=datetime.utcnow() + timedelta(hours=24))
129
130        self.assert_(not wallet.isValidCredential(futureAssertion))
131       
132    def test03AuditCredential(self):
133        # Add a short lived credential and ensure it's removed when an audit
134        # is carried to prune expired credentials
135        shortExpiryAssertion = self._createAssertion(validityDuration=1)
136        wallet = SAMLAttributeWallet()
137        wallet.addCredential(shortExpiryAssertion)
138       
139        self.assert_(len(wallet.credentials) == 1)
140        sleep(2)
141        wallet.audit()
142        self.assert_(len(wallet.credentials) == 0)
143
144    def test04ClockSkewTolerance(self):
145        # Add a short lived credential but with the wallet set to allow for
146        # a clock skew of
147        shortExpiryAssertion = self._createAssertion(validityDuration=1)
148        wallet = SAMLAttributeWallet()
149       
150        # Set a tolerance of five seconds
151        wallet.clockSkewTolerance = 5.*60*60
152        wallet.addCredential(shortExpiryAssertion)
153       
154        self.assert_(len(wallet.credentials) == 1)
155        sleep(2)
156        wallet.audit()
157        self.assert_(len(wallet.credentials) == 1)
158       
159    def test05ReplaceCredential(self):
160        # Replace an existing credential from a given institution with a more
161        # up to date one
162        wallet = self._addCredential()
163        self.assert_(len(wallet.credentials) == 1)
164       
165        newAssertion = self._createAssertion() 
166
167        wallet.addCredential(newAssertion)
168        self.assert_(len(wallet.credentials) == 1)
169        self.assert_(newAssertion.conditions.notOnOrAfter == \
170                     wallet.credentials[
171                        self.__class__.SITEA_SAML_ISSUER_NAME
172                    ].credential.conditions.notOnOrAfter)
173       
174    def test06CredentialsFromSeparateSites(self):
175        wallet = self._addCredential()
176        wallet.addCredential(self._createAssertion(issuerName="MySite"))
177        self.assert_(len(wallet.credentials) == 2)
178
179    def test07Pickle(self):
180        wallet = self._addCredential()
181        outFile = open(self.__class__.PICKLE_FILEPATH, 'w')
182        pickle.dump(wallet, outFile)
183        outFile.close()
184       
185        inFile = open(self.__class__.PICKLE_FILEPATH)
186        unpickledWallet = pickle.load(inFile)
187        self.assert_(unpickledWallet.credentialsKeyedByURI.get(
188            self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI))
189       
190        self.assert_(unpickledWallet.credentials.items()[0][1].issuerName == \
191                     BaseTestCase.SITEA_SAML_ISSUER_NAME)
192
193    def test08CreateFromConfig(self):
194        wallet = SAMLAttributeWallet.fromConfig(
195                                self.__class__.CONFIG_FILEPATH)
196        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
197        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
198       
199
200class SAMLAuthzDecisionWalletTestCase(CredentialWalletBaseTestCase):
201    """Test wallet for caching Authorisation Decision statements"""
202    PICKLE_FILENAME = 'SAMLAuthzDecisionWalletPickle.dat'
203    PICKLE_FILEPATH = os.path.join(CredentialWalletBaseTestCase.THIS_DIR, 
204                                   PICKLE_FILENAME)
205   
206    RESOURCE_ID = 'http://localhost/My%20Secured%20URI'
207    ASSERTION_STR = """
208    <saml:Assertion xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" Version="2.0" IssueInstant="$timeNow" ID="c32235a9-85df-4325-99a2-bad73668c01d">
209        <saml:Issuer Format="urn:oasis:names:tc:SAML:1.1:nameid-format:x509SubjectName">/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk</saml:Issuer>
210        <saml:Subject>
211            <saml:NameID Format="urn:esg:openid">https://openid.localhost/philip.kershaw</saml:NameID>
212        </saml:Subject>
213        <saml:Conditions NotOnOrAfter="$timeExpires" NotBefore="$timeNow"></saml:Conditions>
214        <saml:AuthzDecisionStatement Decision="Permit" Resource="$resourceId">
215            <saml:Action Namespace="urn:oasis:names:tc:SAML:1.0:action:ghpp">GET</saml:Action>
216        </saml:AuthzDecisionStatement>
217    </saml:Assertion>
218    """
219   
220    def setUp(self):
221        self.assertion = self._createAssertion()
222       
223    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
224                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
225        if timeNow is None:
226            timeNow = datetime.utcnow()
227           
228        timeExpires = timeNow + timedelta(seconds=validityDuration)
229        assertionStr = Template(
230            self.__class__.ASSERTION_STR).substitute(
231                dict(
232                 issuerName=issuerName,
233                 timeNow=SAMLDateTime.toString(timeNow), 
234                 timeExpires=SAMLDateTime.toString(timeExpires),
235                 resourceId=self.__class__.RESOURCE_ID,
236                )
237            )
238
239        assertionStream = StringIO()
240        assertionStream.write(assertionStr)
241        assertionStream.seek(0)
242        assertionElem = ElementTree.parse(assertionStream).getroot()
243        return AssertionElementTree.fromXML(assertionElem)
244                   
245    def _addCredential(self):
246        wallet = SAMLAuthzDecisionWallet()   
247        wallet.addCredential(self.assertion)
248        return wallet
249   
250    def test01AddCredential(self):
251        wallet = self._addCredential()
252       
253        self.assert_(len(wallet.credentials) == 1)
254        self.assert_(self.__class__.RESOURCE_ID in wallet.credentials)
255
256        assertion = wallet.credentials[self.__class__.RESOURCE_ID].credential
257       
258        print("SAML Assertion:\n%s" % 
259              prettyPrint(AssertionElementTree.toXML(assertion)))
260   
261    def test02VerifyCredential(self):
262        wallet = SAMLAttributeWallet()
263        self.assert_(wallet.isValidCredential(self.assertion))
264       
265        expiredAssertion = self._createAssertion(
266                                timeNow=datetime.utcnow() - timedelta(hours=24))
267                               
268        self.assert_(not wallet.isValidCredential(expiredAssertion))
269       
270        futureAssertion = self._createAssertion(
271                                timeNow=datetime.utcnow() + timedelta(hours=24))
272
273        self.assert_(not wallet.isValidCredential(futureAssertion))
274       
275    def test03AuditCredential(self):
276        # Add a short lived credential and ensure it's removed when an audit
277        # is carried to prune expired credentials
278        shortExpiryAssertion = self._createAssertion(validityDuration=1)
279        wallet = SAMLAttributeWallet()
280        wallet.addCredential(shortExpiryAssertion)
281       
282        self.assert_(len(wallet.credentials) == 1)
283        sleep(2)
284        wallet.audit()
285        self.assert_(len(wallet.credentials) == 0)
286
287    def test04ClockSkewTolerance(self):
288        # Add a short lived credential but with the wallet set to allow for
289        # a clock skew of
290        shortExpiryAssertion = self._createAssertion(validityDuration=1)
291        wallet = SAMLAuthzDecisionWallet()
292       
293        # Set a tolerance of five seconds
294        wallet.clockSkewTolerance = 5.*60*60
295        wallet.addCredential(shortExpiryAssertion)
296       
297        self.assert_(len(wallet.credentials) == 1)
298        sleep(2)
299        wallet.audit()
300        self.assert_(len(wallet.credentials) == 1)
301       
302    def test05ReplaceCredential(self):
303        # Replace an existing credential from a given institution with a more
304        # up to date one
305        wallet = self._addCredential()
306        self.assert_(len(wallet.credentials) == 1)
307       
308        newAssertion = self._createAssertion() 
309
310        wallet.addCredential(newAssertion)
311        self.assert_(len(wallet.credentials) == 1)
312        self.assert_(newAssertion.conditions.notOnOrAfter == \
313                     wallet.credentials[
314                        self.__class__.RESOURCE_ID
315                    ].credential.conditions.notOnOrAfter)
316
317    def test06Pickle(self):
318        wallet = self._addCredential()
319        outFile = open(self.__class__.PICKLE_FILEPATH, 'w')
320        pickle.dump(wallet, outFile)
321        outFile.close()
322       
323        inFile = open(self.__class__.PICKLE_FILEPATH)
324        unpickledWallet = pickle.load(inFile)
325        self.assert_(unpickledWallet.credentials.get(
326                                                    self.__class__.RESOURCE_ID))
327       
328    def test07CreateFromConfig(self):
329        wallet = SAMLAttributeWallet.fromConfig(
330                                self.__class__.CONFIG_FILEPATH)
331        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
332        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
333
334
335if __name__ == "__main__":
336    unittest.main()       
Note: See TracBrowser for help on using the repository browser.