source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attCert/AttCertTest.py @ 4718

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attCert/AttCertTest.py@4734
Revision 4718, 11.6 KB checked in by pjkersha, 11 years ago (diff)

Refactored Attribute Authority client unit tests separating out AA services into the config dir.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG AttCert class unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/01/07"
8__copyright__ = "(C) 2007 STFC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14
15import unittest
16import os
17import sys
18import getpass
19import traceback
20
21from ConfigParser import SafeConfigParser
22from ndg.security.common.AttCert import AttCert
23from ndg.security.test import BaseTestCase
24
25xpdVars = os.path.expandvars
26jnPath = os.path.join
27mkPath = lambda file: jnPath(os.environ['NDGSEC_ATTCERT_UNITTEST_DIR'], file)
28
29class AttCertTestCase(BaseTestCase):
30   
31    def setUp(self):
32        super(AttCertTestCase, self).setUp()
33       
34        if 'NDGSEC_INT_DEBUG' in os.environ:
35            import pdb
36            pdb.set_trace()
37       
38        if 'NDGSEC_ATTCERT_UNITTEST_DIR' not in os.environ:
39            os.environ['NDGSEC_ATTCERT_UNITTEST_DIR'] = \
40                os.path.abspath(os.path.dirname(__file__))
41
42        configParser = SafeConfigParser()
43        configFilePath = jnPath(os.environ['NDGSEC_ATTCERT_UNITTEST_DIR'],
44                                'attCertTest.cfg')
45        configParser.read(configFilePath)
46       
47        self.cfg = {}
48        for section in configParser.sections():
49            self.cfg[section] = dict(configParser.items(section))
50
51        self.attCert = AttCert()
52           
53           
54    def test1AttCert4NonZero(self):
55        'test1AttCert4NonZero: check if test yields True'
56        if not self.attCert:
57            self.fail("AttCert instance yields 0")
58
59       
60    def test2SetProvenance(self):
61        'test2SetProvenance'
62        self.attCert['provenance'] = AttCert.origProvenance
63        print "test2SetProvenance - set to: %s" % self.attCert['provenance']
64       
65       
66    def test3TryToAlterProvenance(self):
67        'test3TryToAlterProvenance'
68        try:
69            AttCert.origProvenance = 'Another provenance setting'
70        except AttributeError, e:
71            print \
72        "test3TryToAlterProvenance - PASSED - expected exception: \"%s\"" % e
73        except:
74            self.fail('Original provenance should be read-only')
75           
76           
77    def test4SetValidityTime(self):
78        'test4SetValidityTime'
79        self.attCert.setValidityTime(lifetime=60*60*8.)
80       
81        print 'test4SetValidityTime: %s' % self.attCert['validity']
82
83       
84    def test5SetDefaultValidityTime(self):
85        'test5SetDefaultValidityTime: use default settings'
86        self.attCert.setValidityTime()
87       
88        print 'test5SetDefaultValidityTime: %s' % self.attCert['validity']
89
90 
91    def test6AddRoles(self):
92        'test6AddRoles: add extra roles'
93        self.attCert.addRoles(['government', 'acsoe'])
94        self.attCert.addRoles('atsr')
95       
96        print "test6AddRoles: " + ', '.join(self.attCert.roles)
97
98 
99    def test6aSet(self):
100        'test6aSet: test __setitem__ and property methods'
101        self.attCert.version = "1.0"
102        self.attCert['issuer'] = '/O=NDG/OU=BADC/CN=Attribute Authority'
103        self.attCert['issuerName'] = 'BADC'
104        self.attCert.issuerSerialNumber = 1234
105        self.attCert['holder'] = '/O=NDG/OU=BADC/CN=server.cert.ac.uk'
106        self.attCert.userId = '/O=NDG/OU=BADC/CN=pjkershaw'
107       
108        try:
109            self.attCert['validity'] = 'invalid'
110        except KeyError, e:
111            print "test6aSet: PASSED - %s" % e
112           
113        try:
114            self.attCert['attributes'] = 'roleSet'
115        except KeyError, e:
116            print "test6aSet: PASSED - %s" % e
117           
118        try:
119            self.attCert['attributes']['roleSet'] = ['role1', 'role2']
120        except KeyError, e:
121            print "test6aSet: PASSED - %s" % e
122
123    def test6bGet(self):
124        'test6bGet: test __getitem__ and property methods'
125        print "test6bGet ..."
126        self.test2SetProvenance()
127        self.test4SetValidityTime()
128        self.test6AddRoles()
129        self.test6aSet()
130
131        print "self.attCert['version'] = %s" % self.attCert['version']
132        print "self.attCert.version = %s" % self.attCert.version
133       
134        print "self.attCert['issuer'] = %s" % self.attCert['issuer']
135        print "self.attCert.issuer = %s" % self.attCert.issuer
136        print "self.attCert.issuerDN = %s" % self.attCert.issuerDN
137
138        print "self.attCert['issuerName'] = %s" % self.attCert['issuerName']
139        print "self.attCert.issuerName = %s" % self.attCert.issuerName
140       
141        print "self.attCert['issuerSerialNumber'] = %s" % \
142                                            self.attCert['issuerSerialNumber']
143        print "self.attCert.issuerSerialNumber = %s" % \
144                                            self.attCert.issuerSerialNumber
145       
146        print "self.attCert['holder'] = %s" % self.attCert['holder']
147        print "self.attCert.holder = %s" % self.attCert.holder
148        print "self.attCert.holderDN = %s" % self.attCert.holderDN
149
150        print "self.attCert['userId'] = %s" % self.attCert['userId']
151        print "self.attCert.userId = %s" % self.attCert.userId
152       
153        print "self.attCert['validity'] = %s" % self.attCert['validity']
154        print "self.attCert.validityNotBefore = %s" % \
155                                                self.attCert.validityNotBefore
156        print "self.attCert.validityNotAfter = %s" % \
157                                                self.attCert.validityNotAfter
158                                               
159        print "self.attCert.getValidityNotBefore(asDatetime=True) = %s" % \
160                            self.attCert.getValidityNotBefore(asDatetime=True)
161        print "self.attCert.getValidityNotAfter(asDatetime=True) = %s" % \
162                            self.attCert.getValidityNotAfter(asDatetime=True)
163       
164        print "self.attCert['attributes'] = %s" % self.attCert['attributes']
165        print "self.attCert['attributes']['roleSet'] %s: " % \
166                                        self.attCert['attributes']['roleSet'] 
167        print "self.attCert.roleSet = %s" % self.attCert.roleSet
168        print "self.attCert.roles = %s" % self.attCert.roles
169
170    def test7CreateXML(self):
171        'test7CreateXML: check for correct formatted string'
172        self.test2SetProvenance()
173        self.test5SetDefaultValidityTime()
174        self.test6AddRoles()
175        print 'test7CreateXML:\n\n' + self.attCert.createXML()
176
177   
178    def test8Parse(self):
179        '''test8Parse: parse an XML document''' 
180        self.attCert.parse(self.attCert.createXML())
181        print 'test8Parse:\n\n' + repr(self.attCert)
182
183    def test9Sign(self): 
184        '''test9Sign: sign document'''
185        self.test2SetProvenance()
186        self.test5SetDefaultValidityTime()
187        self.test6AddRoles()
188        self.test6aSet()   
189       
190        self.attCert.filePath = xpdVars(self.cfg['test9Sign']['filepath'])
191        self.attCert.certFilePathList = \
192            xpdVars(self.cfg['test9Sign']['signingcertfilepath'])
193        self.attCert.signingKeyFilePath = \
194            xpdVars(self.cfg['test9Sign']['signingprikeyfilepath'])
195       
196        signingKeyPwd = self.cfg['test9Sign'].get('signingprikeypwd')
197        if signingKeyPwd is None:
198            try:
199                self.attCert.signingKeyPwd = \
200                getpass.getpass(prompt="\ntest9Sign private key password: ")
201            except KeyboardInterrupt:
202                self.fail("test9Sign: Aborting test")
203                return
204        else:
205            self.attCert.signingKeyPwd = signingKeyPwd
206           
207        self.attCert.applyEnvelopedSignature()
208        print 'test9Sign: \n\n%s' % self.attCert
209   
210   
211    def test10Write(self):
212        '''test10Write: write document'''
213           
214        self.test9Sign()
215        self.attCert.filePath = xpdVars(self.cfg['test10Write']['filepath'])
216        self.attCert.write()
217     
218       
219    def test11Read(self):
220        '''test11Read: read document'''
221           
222        self.attCert.filePath = xpdVars(self.cfg['test11Read']['filepath'])
223        self.attCert.read()
224        print 'test11Read: \n\n%s' % self.attCert
225       
226
227    def test12IsValid(self):
228        '''test12IsValid: check signature of XML document'''           
229        self.test11Read()
230        self.attCert.certFilePathList = [xpdVars(file) for file in \
231                    self.cfg['test12IsValid']['certfilepathlist'].split()]
232        self.attCert.isValid(raiseExcep=True)
233        print 'test12IsValid: passed'
234       
235
236    def test13IsValidStressTest(self):
237        '''test13IsValidStressTest: check signature of XML document'''           
238        self.test2SetProvenance()
239        self.test5SetDefaultValidityTime()
240        self.test6aSet()   
241       
242        self.attCert.certFilePathList = [xpdVars(file) for file in \
243            self.cfg['test13IsValidStressTest']['certfilepathlist'].split()]
244        self.attCert.signingKeyFilePath = \
245                        xpdVars(self.cfg['test13IsValidStressTest']['signingprikeyfilepath'])
246       
247        signingKeyPwd = self.cfg['test13IsValidStressTest'].get('signingprikeypwd')
248        if signingKeyPwd is None:
249            try:
250                self.attCert.signingKeyPwd = getpass.getpass(\
251                    prompt="\ntest13IsValidStressTest private key password: ")
252            except KeyboardInterrupt:
253                self.fail("test13IsValidStressTest: Aborting test")
254                return
255        else:
256            self.attCert.signingKeyPwd = signingKeyPwd
257           
258        import base64
259        for i in range(0, int(self.cfg['test13IsValidStressTest']['nruns'])):
260            # Generate a range of random role names to try to trip up the
261            # signature validation
262            roles = [base64.encodestring(os.urandom(i)).strip() \
263                     for role in range(0, i)]
264            self.attCert.addRoles(roles)
265           
266            # Write AC file names by index
267            self.attCert.filePath = mkPath("stress-test-ac-%03d.xml" % i)
268           
269            self.attCert.applyEnvelopedSignature()
270            self.attCert.write()
271
272            try:
273                self.attCert.isValid(raiseExcep=True)
274            except Exception, e:
275                msg = "Verification failed for %s: %s" % \
276                    (self.attCert.filePath, str(e))
277                print msg
278                open('%03d.msg' % i, 'w').write(msg)   
279
280    def test14IsValidSignature(self):
281        '''test14IsValidSignature: check signature of XML document'''           
282        self.attCert.filePath = \
283            xpdVars(self.cfg['test14IsValidSignature']['filepath'])
284        self.attCert.read()
285       
286        certFilePathList = [xpdVars(file) for file in \
287                self.cfg['test14IsValidSignature']['certfilepathlist'].split()]
288       
289        self.attCert.certFilePathList = certFilePathList
290        self.attCert.verifyEnvelopedSignature()
291       
292        print 'test14IsValidSignature: \n\n%s' % self.attCert
293       
294class AttCertTestSuite(unittest.TestSuite):
295    def __init__(self):
296        map = map(AttCertTestCase,
297                  (
298                    "test1AttCert4NonZero",
299                    "test2SetProvenance",
300                    "test3TryToAlterProvenance",
301                    "test4SetValidityTime",
302                    "test5SetDefaultValidityTime",
303                    "test6AddRoles",
304                    "test7CreateXML",
305                    "test8Parse",
306                    "test9Sign",
307                    "test10Write",
308                    "test11Read",
309                    "test12IsValid",
310                  ))
311        unittest.TestSuite.__init__(self, map)
312 
313                                       
314if __name__ == "__main__":
315    unittest.main()
Note: See TracBrowser for help on using the repository browser.