source: TI12-security/trunk/esg_system_tests/esg/security/test/system/test_ssl.py @ 7676

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/esg_system_tests/esg/security/test/system/test_ssl.py@7676
Revision 7676, 4.0 KB checked in by pjkersha, 9 years ago (diff)

Added ability to flag / ignore cert expiry errors.

Line 
1"""ESG Security SSL system tests
2
3"""
4__author__ = "P J Kershaw"
5__date__ = "20/09/10"
6__copyright__ = "(C) 2010 Science and Technology Facilities Council"
7__license__ = "BSD"
8__contact__ = "Philip.Kershaw@stfc.ac.uk"
9__revision__ = '$Id$'
10import logging
11logging.basicConfig(level=logging.DEBUG, 
12                    format='%(asctime)s %(levelname)-8s %(message)s',
13                    datefmt='%a, %d %b %Y %H:%M:%S')
14log = logging.getLogger(__name__)
15
16import unittest
17import traceback
18import socket
19from os import path
20from ConfigParser import SafeConfigParser, NoOptionError
21
22from OpenSSL import SSL
23
24
25class SSLTestCaseConfigException(Exception):
26    """Invalid Config file settings"""
27   
28   
29class SSLTestCase(unittest.TestCase):
30    """Test SSL endpoints in ESG federation"""
31    THIS_DIR = path.dirname(path.abspath(__file__))
32    INI_FILENAME = 'test_ssl.cfg'
33    INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME)
34   
35    def __init__(self, *arg, **kw):
36        cfg = SafeConfigParser(defaults=dict(here=self.__class__.THIS_DIR))
37        cfg.optionxform = str
38        cfg.read(self.__class__.INI_FILEPATH)
39       
40        self.caCertDir = cfg.get('DEFAULT', 'caCertDir')
41        self.endpoints = []
42        for i in cfg.get('DEFAULT', 'endpoints').split():
43            try:
44                fqdn, port = i.split(':') 
45            except ValueError:
46                raise SSLTestCaseConfigException("Reading configuration file "
47                                                 "%r - endpoints format is "
48                                                 "<fqdn>:<portnum>", 
49                                                 self.__class__.INI_FILEPATH)
50            self.endpoints.append((fqdn, int(port)))
51       
52        try:
53            self.ignoreCertExpiryErrors = cfg.getboolean('DEFAULT', 
54                                                     'ignoreCertExpiryErrors')
55        except NoOptionError:
56            self.ignoreCertExpiryErrors = False
57           
58        self.ctx = SSL.Context(SSL.SSLv3_METHOD)
59        self.ctx.load_verify_locations(None, self.caCertDir)
60        self.ctx.set_verify_depth(9)
61       
62        def _callback(conn, x509, errorNum, errorDepth, preverifyOK):
63            """OpenSSL Verification callback"""
64            if errorNum == 10 and self.ignoreCertExpiryErrors:
65                dn = x509.get_subject()
66                log.warning("\"ignoreCertExpiryErrors\" flag set: ignoring "
67                            "certificate expiry error number %d for "
68                            "certificate %s", errorNum, dn)
69                return True
70               
71            elif errorNum != 0:
72                dn = x509.get_subject()
73                log.error("Error number for certificate %s is %d", dn, errorNum)
74               
75            return preverifyOK
76       
77        self.ctx.set_verify(SSL.VERIFY_PEER, _callback)
78       
79        super(SSLTestCase, self).__init__(*arg, **kw)
80   
81    def _test_connection(self, endpoint):
82        log.info('Probing %s:%d ...' % endpoint)
83
84        conn = SSL.Connection(self.ctx, socket.socket())
85        conn.connect(endpoint)
86        try:
87            conn.do_handshake()
88        except SSL.Error:
89            log.error("Handshake error for %r: %s" %
90                      (endpoint, traceback.format_exc()))
91            return False
92           
93        except socket.error:
94            log.error("Socket error for %r: %s" %
95                      (endpoint, traceback.format_exc()))
96            return False
97       
98        return True
99       
100    def test01ValidPeerCerts(self):
101        # Verify all peers have EECs issued by valid ESG CAs
102        nFails = 0
103        for i in self.endpoints:
104            if not self._test_connection(i):
105                nFails += 1
106
107        self.failIf(nFails > 0, "%d connection failure(s)" % nFails)
108
109    def test02HttpsEnforcedWhitelisting(self):
110        # Check HTTPS endpoints have correct whitelisting enforced - expect
111        # negative result as this client holds an invalid certificate
112        pass
113       
114if __name__ == "__main__":
115    unittest.main()
Note: See TracBrowser for help on using the repository browser.