source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py @ 4667

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthorityclient/test_attributeauthorityclient.py@4667
Revision 4667, 12.0 KB checked in by pjkersha, 12 years ago (diff)
  • Completed Attribute Authority unit test
  • re-issued out of date test certs.
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SOAP client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2008 STFC & NERC"
9__license__ = \
10"""This software may be distributed under the terms of the Q Public
11License, version 1.0 or later."""
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
14
15import unittest
16import os, sys, getpass, re
17import logging
18logging.basicConfig()
19
20from ndg.security.common.attributeauthority import AttributeAuthorityClient, \
21    NoMatchingRoleInTrustedHosts
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse, X509CertRead
24from ndg.security.common.utils.configfileparsers import \
25    CaseSensitiveConfigParser
26   
27from os.path import expandvars as xpdVars
28from os.path import join as jnPath
29mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)
30
31
32class AttributeAuthorityClientTestCase(unittest.TestCase):
33    clntPriKeyPwd = None
34    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
35
36    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
37        '''Read proxy cert and user cert from a single PEM file and put in
38        a list ready for input into SignatureHandler'''               
39        proxyCertFileTxt = open(proxyCertFilePath).read()
40       
41        pemPatRE = re.compile(self.__class__.pemPat, re.S)
42        x509CertList = pemPatRE.findall(proxyCertFileTxt)
43       
44        signingCertChain = [X509CertParse(x509Cert) for x509Cert in \
45                            x509CertList]
46   
47        # Expecting proxy cert first - move this to the end.  This will
48        # be the cert used to verify the message signature
49        signingCertChain.reverse()
50       
51        return signingCertChain
52
53
54    def setUp(self):
55
56        if 'NDGSEC_INT_DEBUG' in os.environ:
57            import pdb
58            pdb.set_trace()
59       
60        if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ:
61            os.environ['NDGSEC_AACLNT_UNITTEST_DIR'] = \
62                os.path.abspath(os.path.dirname(__file__))
63
64        self.cfgParser = CaseSensitiveConfigParser()
65        cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'],
66                             'attAuthorityClientTest.cfg')
67        self.cfgParser.read(cfgFilePath)
68       
69        self.cfg = {}
70        for section in self.cfgParser.sections():
71            self.cfg[section] = dict(self.cfgParser.items(section))
72
73        try:
74            sslCACertList = [X509CertRead(xpdVars(file)) for file in \
75                         self.cfg['setUp']['sslcaCertFilePathList'].split()]
76        except KeyError:
77            sslCACertList = []
78           
79        # Instantiate WS proxy
80        self.siteAClnt = AttributeAuthorityClient(uri=self.cfg['setUp']['uri'],
81                        sslPeerCertCN=self.cfg['setUp'].get('sslPeerCertCN'),
82                        sslCACertList=sslCACertList,
83                        cfgFileSection='wsse',
84                        cfg=self.cfgParser)           
85
86    def test01GetHostInfo(self):
87        """test01GetHostInfo: retrieve info for AA host"""
88        hostInfo = self.siteAClnt.getHostInfo()
89        print "Host Info:\n %s" % hostInfo       
90
91    def test02GetTrustedHostInfo(self):
92        """test02GetTrustedHostInfo: retrieve trusted host info matching a
93        given role"""
94        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\
95                                 self.cfg['test02GetTrustedHostInfo']['role'])
96        for hostname, hostInfo in trustedHostInfo.items():
97            self.assert_(hostname, "Hostname not set")
98            for k, v in hostInfo.items():
99                self.assert_(k, "hostInfo value key unset")
100
101        print "Trusted Host Info:\n %s" % trustedHostInfo
102
103    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
104        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
105        where the input role doesn't match any roles in the target AA's map
106        config file"""
107        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
108        try:
109            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
110            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
111           
112        except NoMatchingRoleInTrustedHosts, e:
113            print 'As expected - no match for role "%s": %s' % \
114                (_cfg['role'], e)
115
116
117    def test04GetTrustedHostInfoWithNoRole(self):
118        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
119        irrespective of role"""
120        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
121        for hostname, hostInfo in trustedHostInfo.items():
122            self.assert_(hostname, "Hostname not set")
123            for k, v in hostInfo.items():
124                self.assert_(k, "hostInfo value key unset")
125                self.assert_(v, "%s value not set" % k)
126                   
127        print "Trusted Host Info:\n %s" % trustedHostInfo
128       
129
130    def test05GetAllHostsInfo(self):
131        """test05GetAllHostsInfo: retrieve info for all hosts"""
132        allHostInfo = self.siteAClnt.getAllHostsInfo()
133        for hostname, hostInfo in allHostInfo.items():
134            self.assert_(hostname, "Hostname not set")
135            for k, v in hostInfo.items():
136                self.assert_(k, "hostInfo value key unset")
137                   
138        print "All Hosts Info:\n %s" % allHostInfo
139
140
141    def test06GetAttCert(self):       
142        """test06GetAttCert: Request attribute certificate from NDG Attribute
143        Authority Web Service."""
144        _cfg = self.cfg['test06GetAttCert']
145       
146        # Read user Certificate into a string ready for passing via WS
147        try:
148            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
149            userX509CertTxt = open(userX509CertFilePath, 'r').read()