source: TI12-security/trunk/esg_system_tests/esg/security/test/system/test_ssl.py @ 7651

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/esg_system_tests/esg/security/test/system/test_ssl.py@7651
Revision 7651, 3.3 KB checked in by pjkersha, 9 years ago (diff)

ESG System SSL unittests

  • added error output for OpenSSL error number - useful for troubleshoot verification errors.
Line 
1"""ESG Security SSL system tests
2
3"""
4__author__ = "P J Kershaw"
5__date__ = "20/09/10"
6__copyright__ = "(C) 2010 Science and Technology Facilities Council"
7__license__ = "BSD"
8__contact__ = "Philip.Kershaw@stfc.ac.uk"
9__revision__ = '$Id$'
10import logging
11logging.basicConfig(level=logging.DEBUG, 
12                    format='%(asctime)s %(levelname)-8s %(message)s',
13                    datefmt='%a, %d %b %Y %H:%M:%S')
14log = logging.getLogger(__name__)
15
16import unittest
17import traceback
18import socket
19from os import path
20from ConfigParser import SafeConfigParser
21
22from OpenSSL import SSL
23
24
25class SSLTestCaseConfigException(Exception):
26    """Invalid Config file settings"""
27   
28   
29class SSLTestCase(unittest.TestCase):
30    """Test SSL endpoints in ESG federation"""
31    THIS_DIR = path.dirname(path.abspath(__file__))
32    INI_FILENAME = 'test_ssl.cfg'
33    INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME)
34   
35    def __init__(self, *arg, **kw):
36        cfg = SafeConfigParser(defaults=dict(here=self.__class__.THIS_DIR))
37        cfg.optionxform = str
38        cfg.read(self.__class__.INI_FILEPATH)
39       
40        self.caCertDir = cfg.get('DEFAULT', 'caCertDir')
41        self.endpoints = []
42        for i in cfg.get('DEFAULT', 'endpoints').split():
43            try:
44                fqdn, port = i.split(':') 
45            except ValueError:
46                raise SSLTestCaseConfigException("Reading configuration file "
47                                                 "%r - endpoints format is "
48                                                 "<fqdn>:<portnum>", 
49                                                 self.__class__.INI_FILEPATH)
50            self.endpoints.append((fqdn, int(port)))
51       
52        self.ctx = SSL.Context(SSL.SSLv3_METHOD)
53        self.ctx.load_verify_locations(None, self.caCertDir)
54        self.ctx.set_verify_depth(9)
55       
56        def _callback(conn, x509, errorNum, errorDepth, preverifyOK):
57            if errorNum != 0:
58                dn = x509.get_subject()
59                log.error("Error number for certificate %s is %d", dn, errorNum)
60               
61            return preverifyOK
62       
63        self.ctx.set_verify(SSL.VERIFY_PEER, _callback)
64       
65        super(SSLTestCase, self).__init__(*arg, **kw)
66   
67    def _test_connection(self, endpoint):
68        log.info('Probing %s:%d ...' % endpoint)
69
70        conn = SSL.Connection(self.ctx, socket.socket())
71        conn.connect(endpoint)
72        try:
73            conn.do_handshake()
74        except SSL.Error:
75            log.error("Handshake error for %r: %s" %
76                      (endpoint, traceback.format_exc()))
77            return False
78           
79        except socket.error:
80            log.error("Socket error for %r: %s" %
81                      (endpoint, traceback.format_exc()))
82            return False
83       
84    def test01ValidPeerCerts(self):
85        # Verify all peers have EECs issued by valid ESG CAs
86        nFails = 0
87        for i in self.endpoints:
88            if not self._test_connection(i):
89                nFails += 1
90
91        self.failIf(nFails > 0, "%d connection failure(s)" % nFails)
92
93    def test02HttpsEnforcedWhitelisting(self):
94        # Check HTTPS endpoints have correct whitelisting enforced - expect
95        # negative result as this client holds an invalid certificate
96        pass
97       
98if __name__ == "__main__":
99    unittest.main()
Note: See TracBrowser for help on using the repository browser.