source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 7359

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@7359
Revision 7359, 12.0 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • simplified credential wallet - now a single class SAMLAssertionWallet for caching assertions containing authorisation decision statements and/or attribute statements
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet class
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17
18from string import Template
19from cStringIO import StringIO
20import cPickle as pickle
21
22from elementtree import ElementTree
23
24from time import sleep
25from datetime import datetime, timedelta
26
27from ndg.saml.utils import SAMLDateTime
28from ndg.saml.xml.etree import AssertionElementTree
29
30from ndg.security.test.unit import BaseTestCase
31from ndg.security.common.utils.etree import prettyPrint
32from ndg.security.common.credentialwallet import SAMLAssertionWallet
33
34
35class CredentialWalletBaseTestCase(BaseTestCase):
36    THIS_DIR = os.path.dirname(__file__)
37    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
38    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
39   
40   
41class SAMLAttributeWalletTestCase(CredentialWalletBaseTestCase):
42    PICKLE_FILENAME = 'SAMLAttributeWalletPickle.dat'
43    PICKLE_FILEPATH = os.path.join(CredentialWalletBaseTestCase.THIS_DIR, 
44                                   PICKLE_FILENAME)
45   
46    ASSERTION_STR = (
47"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
48   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
49   <saml:Subject>
50      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
51   </saml:Subject>
52   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
53   <saml:AttributeStatement>
54      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
55         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
56      </saml:Attribute>
57      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
58         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
59      </saml:Attribute>
60      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
61         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
62      </saml:Attribute>
63   </saml:AttributeStatement>
64</saml:Assertion>
65"""
66    )
67   
68    def __init__(self, *arg, **kw):
69        super(SAMLAttributeWalletTestCase, self).__init__(*arg, **kw)
70       
71    def setUp(self):
72        self.assertion = self._createAssertion()
73       
74    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
75                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
76        if timeNow is None:
77            timeNow = datetime.utcnow()
78           
79        timeExpires = timeNow + timedelta(seconds=validityDuration)
80        assertionStr = Template(
81            self.__class__.ASSERTION_STR).substitute(
82                dict(
83                 issuerName=issuerName,
84                 timeNow=SAMLDateTime.toString(timeNow), 
85                 timeExpires=SAMLDateTime.toString(timeExpires)
86                )
87            )
88
89        assertionStream = StringIO()
90        assertionStream.write(assertionStr)
91        assertionStream.seek(0)
92        assertionElem = ElementTree.parse(assertionStream).getroot()
93        return AssertionElementTree.fromXML(assertionElem)
94
95    def _addCredentials(self):
96        wallet = SAMLAssertionWallet()   
97        wallet.addCredentials(self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI,
98                              [self.assertion])
99        return wallet
100   
101    def test01AddCredentials(self):
102        wallet = self._addCredentials()
103        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI
104        self.assert_(len(wallet.retrieveCredentials(k)) == 1)
105        assertions = wallet.retrieveCredentials(
106                            self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
107        self.assert_(assertions)
108       
109        print("SAML Assertion:\n%s" % 
110              prettyPrint(AssertionElementTree.toXML(assertions[0])))
111   
112    def test02VerifyCredential(self):
113        wallet = SAMLAssertionWallet()
114        self.assert_(wallet.isValidCredential(self.assertion))
115       
116        expiredAssertion = self._createAssertion(
117                                timeNow=datetime.utcnow() - timedelta(hours=24))
118                               
119        self.assert_(not wallet.isValidCredential(expiredAssertion))
120       
121        futureAssertion = self._createAssertion(
122                                timeNow=datetime.utcnow() + timedelta(hours=24))
123
124        self.assert_(not wallet.isValidCredential(futureAssertion))
125       
126    def test03AuditCredentials(self):
127        # Add a short lived credential and ensure it's removed when an audit
128        # is carried to prune expired credentials
129        shortExpiryAssertion = self._createAssertion(validityDuration=1)
130        wallet = SAMLAssertionWallet()
131        wallet.addCredentials('a', [shortExpiryAssertion])
132       
133        self.assert_(wallet.retrieveCredentials('a'))
134        sleep(2)
135        wallet.audit()
136        self.assert_(wallet.retrieveCredentials('a') is None)
137
138    def test04ClockSkewTolerance(self):
139        # Add a short lived credential but with the wallet set to allow for
140        # a clock skew of
141        shortExpiryAssertion = self._createAssertion(validityDuration=1)
142        wallet = SAMLAssertionWallet()
143       
144        # Set a tolerance of five seconds
145        wallet.clockSkewTolerance = 5.*60*60
146        wallet.addCredentials('a', [shortExpiryAssertion])
147       
148        self.assert_(wallet.retrieveCredentials('a'))
149        sleep(2)
150        wallet.audit()
151        self.assert_(wallet.retrieveCredentials('a'))
152       
153    def test05ReplaceCredential(self):
154        # Replace an existing credential from a given institution with a more
155        # up to date one
156        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI
157        wallet = self._addCredentials()
158        self.assert_(len(wallet.retrieveCredentials(k)) == 1)
159       
160        newAssertion = self._createAssertion() 
161
162        wallet.addCredentials(k, [newAssertion])
163        self.assert_(len(wallet.retrieveCredentials(k)) == 1)
164        self.assert_(newAssertion.conditions.notOnOrAfter == \
165                     wallet.retrieveCredentials(k)[0].conditions.notOnOrAfter)
166       
167    def test06CredentialsFromSeparateKeys(self):
168        wallet = self._addCredentials()
169        wallet.addCredentials("MySite",
170                              [self._createAssertion(issuerName="MySite"),
171                               self._createAssertion()])
172        self.assert_(len(wallet.retrieveCredentials("MySite")) == 2)
173        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI
174        self.assert_(len(wallet.retrieveCredentials(k)) == 1)
175
176    def test07Pickle(self):
177        wallet = self._addCredentials()
178        outFile = open(self.__class__.PICKLE_FILEPATH, 'w')
179        pickle.dump(wallet, outFile)
180        outFile.close()
181       
182        inFile = open(self.__class__.PICKLE_FILEPATH)
183        unpickledWallet = pickle.load(inFile)
184       
185        assertions = unpickledWallet.retrieveCredentials(
186            self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
187        self.assert_(assertions)
188       
189        self.assert_(assertions[0].issuer.value == \
190                     self.__class__.SITEA_SAML_ISSUER_NAME)
191
192    def test08CreateFromConfig(self):
193        wallet = SAMLAssertionWallet.fromConfig(
194                                self.__class__.CONFIG_FILEPATH)
195        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
196        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
197       
198
199class SAMLAuthzDecisionWalletTestCase(CredentialWalletBaseTestCase):
200    """Test wallet for caching Authorisation Decision statements"""
201    PICKLE_FILENAME = 'SAMLAuthzDecisionWalletPickle.dat'
202    PICKLE_FILEPATH = os.path.join(CredentialWalletBaseTestCase.THIS_DIR, 
203                                   PICKLE_FILENAME)
204   
205    RESOURCE_ID = 'http://localhost/My%20Secured%20URI'
206    ASSERTION_STR = """
207    <saml:Assertion xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" Version="2.0" IssueInstant="$timeNow" ID="c32235a9-85df-4325-99a2-bad73668c01d">
208        <saml:Issuer Format="urn:oasis:names:tc:SAML:1.1:nameid-format:x509SubjectName">/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk</saml:Issuer>
209        <saml:Subject>
210            <saml:NameID Format="urn:esg:openid">https://openid.localhost/philip.kershaw</saml:NameID>
211        </saml:Subject>
212        <saml:Conditions NotOnOrAfter="$timeExpires" NotBefore="$timeNow"></saml:Conditions>
213        <saml:AuthzDecisionStatement Decision="Permit" Resource="$resourceId">
214            <saml:Action Namespace="urn:oasis:names:tc:SAML:1.0:action:ghpp">GET</saml:Action>
215        </saml:AuthzDecisionStatement>
216    </saml:Assertion>
217    """
218   
219    def setUp(self):
220        self.assertion = self._createAssertion()
221       
222    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
223                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
224        if timeNow is None:
225            timeNow = datetime.utcnow()
226           
227        timeExpires = timeNow + timedelta(seconds=validityDuration)
228        assertionStr = Template(
229            self.__class__.ASSERTION_STR).substitute(
230                dict(
231                 issuerName=issuerName,
232                 timeNow=SAMLDateTime.toString(timeNow), 
233                 timeExpires=SAMLDateTime.toString(timeExpires),
234                 resourceId=self.__class__.RESOURCE_ID,
235                )
236            )
237
238        assertionStream = StringIO()
239        assertionStream.write(assertionStr)
240        assertionStream.seek(0)
241        assertionElem = ElementTree.parse(assertionStream).getroot()
242        return AssertionElementTree.fromXML(assertionElem)
243                   
244    def _addCredentials(self):
245        wallet = SAMLAssertionWallet()   
246        wallet.addCredentials(self.__class__.RESOURCE_ID, [self.assertion])
247        return wallet
248   
249    def test01AddCredentials(self):
250        wallet = self._addCredentials()
251       
252        self.assert_(
253            len(wallet.retrieveCredentials(self.__class__.RESOURCE_ID)) == 1)
254
255        assertion = wallet.retrieveCredentials(self.__class__.RESOURCE_ID)[-1]
256       
257        print("SAML Assertion:\n%s" % 
258              prettyPrint(AssertionElementTree.toXML(assertion)))
259   
260    def test02VerifyCredential(self):
261        wallet = SAMLAssertionWallet()
262        self.assert_(wallet.isValidCredential(self.assertion))
263       
264        expiredAssertion = self._createAssertion(
265                                timeNow=datetime.utcnow() - timedelta(hours=24))
266                               
267        self.assert_(not wallet.isValidCredential(expiredAssertion))
268       
269        futureAssertion = self._createAssertion(
270                                timeNow=datetime.utcnow() + timedelta(hours=24))
271
272        self.assert_(not wallet.isValidCredential(futureAssertion))
273
274    def test06Pickle(self):
275        wallet = self._addCredentials()
276        outFile = open(self.__class__.PICKLE_FILEPATH, 'w')
277        pickle.dump(wallet, outFile)
278        outFile.close()
279       
280        inFile = open(self.__class__.PICKLE_FILEPATH)
281        unpickledWallet = pickle.load(inFile)
282        self.assert_(unpickledWallet.retrieveCredentials(
283                                                    self.__class__.RESOURCE_ID))
284       
285    def test07CreateFromConfig(self):
286        wallet = SAMLAssertionWallet.fromConfig(
287                                self.__class__.CONFIG_FILEPATH)
288        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
289        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
290
291
292if __name__ == "__main__":
293    unittest.main()       
Note: See TracBrowser for help on using the repository browser.