source: TI12-security/trunk/python/ndg.security.common/ndg/security/common/m2CryptoSSLUtility.py @ 2932

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.common/ndg/security/common/m2CryptoSSLUtility.py@2932
Revision 2932, 7.8 KB checked in by pjkersha, 13 years ago (diff)

ndg.security.common/ndg/security/common/AttAuthority/init.py:

  • fix to AttAuthorityClient?.getAllHostInfo - RoleList? attribute is returned from ZSI ZPI call even if the SOAP XML element is nulled - include in return as [] 'role' dict key regardless.

ndg.security.common/ndg/security/common/m2CryptoSSLUtility.py:

  • new exception InvalidCertDN - use to raise an exception if peer cert DN doesn't match list of acceptedDNs
  • new keyword to HostCheck?.init - acceptedDNs. This enables validation by a check of the peer cert DN against a limited list of certs.
  • Property svn:keywords set to Id
Line 
1"""Extend M2Crypto SSL functionality for cert verification and custom
2timeout settings.
3
4NERC Data Grid Project"""
5__author__ = "P J Kershaw"
6__date__ = "02/07/07"
7__copyright__ = "(C) 2007 STFC & NERC"
8__license__ = \
9"""This software may be distributed under the terms of the Q Public
10License, version 1.0 or later."""
11__contact__ = "P.J.Kershaw@rl.ac.uk"
12__revision__ = '$Id$'
13
14import httplib
15import socket
16
17from M2Crypto import SSL, X509
18from M2Crypto.httpslib import HTTPSConnection as _HTTPSConnection
19
20from ndg.security.common.X509 import X509Cert, X509Stack
21
22class InvalidCertSignature(SSL.Checker.SSLVerificationError):
23    """Raise if verification against CA cert public key fails"""
24
25class InvalidCertDN(SSL.Checker.SSLVerificationError):
26    """Raise if verification against a list acceptable DNs fails"""
27   
28
29class HostCheck(SSL.Checker.Checker, object):
30    """Override SSL.Checker.Checker to enable alternate Common Name
31    setting match for peer cert"""
32
33    def __init__(self, 
34                 peerCertDN=None, 
35                 peerCertCN=None,
36                 acceptedDNs=[], 
37                 caCertList=[],
38                 caCertFilePathList=[], 
39                 **kw):
40        """Override parent class __init__ to enable setting of myProxyServerDN
41        setting
42       
43        @type peerCertDN: string/list
44        @param peerCertDN: Set the expected Distinguished Name of the
45        server to avoid errors matching hostnames.  This is useful
46        where the hostname is not fully qualified. 
47
48        *param acceptedDNs: a list of acceptable DNs.  This enables validation where the expected DN is
49        where against a limited list of certs.
50       
51        @type peerCertCN: string
52        @param peerCertCN: enable alternate Common Name to peer
53        hostname
54       
55        @type caCertList: list type of M2Crypto.X509.X509 types
56        @param caCertList: CA X.509 certificates - if set the peer cert's
57        CA signature is verified against one of these.  At least one must
58        verify
59       
60        @type caCertFilePathList: list string types
61        @param caCertFilePathList: same as caCertList except input as list
62        of CA cert file paths"""
63       
64        SSL.Checker.Checker.__init__(self, **kw)
65       
66        self.peerCertDN = peerCertDN
67        self.peerCertCN = peerCertCN
68        self.acceptedDNs = acceptedDNs
69       
70        if caCertList:
71            self.caCertList = caCertList
72        elif caCertFilePathList:
73            self.caCertFilePathList = caCertFilePathList
74           
75       
76    def __call__(self, peerCert, host=None):
77        """Carry out checks on server ID
78        @param peerCert: MyProxy server host certificate as M2Crypto.X509.X509
79        instance
80        @param host: name of host to check
81        """
82        peerCertDN = '/'+peerCert.get_subject().as_text().replace(', ', '/')
83        try:
84            SSL.Checker.Checker.__call__(self, peerCert, host=self.peerCertCN)
85           
86        except SSL.Checker.WrongHost, e:
87            # Try match against peerCertDN set   
88            if peerCertDN != self.peerCertDN:
89                raise e
90
91        # At least one match should be found in the list
92        if self.acceptedDNs and \
93           not len([dn for dn in self.acceptedDNs if peerCertDN==dn]):
94            raise InvalidCertDN, \
95                "Peer cert DN %s doesn't match verification list" % peerCertDN
96
97        if len(self.__caCertStack) > 0:
98            try:
99                self.__caCertStack.verifyCertChain(\
100                           x509Cert2Verify=X509Cert(m2CryptoX509=peerCert))
101            except Exception, e:
102                raise InvalidCertSignature, \
103            "Peer certificate verification against CA cert failed: "+str(e) 
104             
105        # They match - drop the exception and return all OK instead         
106        return True
107   
108   
109    def __setCACertList(self, caCertList):
110        """Set list of CA certs - peer cert must validate against at least one
111        of these"""
112        self.__caCertStack = X509Stack()
113        for caCert in caCertList:
114            self.__caCertStack.push(caCert)
115
116    caCertList = property(fset=__setCACertList,
117              doc="list of CA certs - peer cert must validate against one")
118
119
120    #_________________________________________________________________________
121    def __setCACertsFromFileList(self, caCertFilePathList):
122        '''Read CA certificates from file and add them to the X.509
123        stack
124       
125        @type caCertFilePathList: list or tuple
126        @param caCertFilePathList: list of file paths for CA certificates to
127        be used to verify certificate used to sign message'''
128       
129        if not isinstance(caCertFilePathList, list) and \
130           not isinstance(caCertFilePathList, tuple):
131            raise AttributeError, \
132                        'Expecting a list or tuple for "caCertFilePathList"'
133
134        self.__caCertStack = X509Stack()
135
136        for caCertFilePath in caCertFilePathList:
137            self.__caCertStack.push(X509.load_cert(caCertFilePath))
138       
139    caCertFilePathList = property(fset=__setCACertsFromFileList,
140    doc="list of CA cert file paths - peer cert must validate against one")
141
142
143class HTTPSConnection(_HTTPSConnection):
144    """Modified version of M2Crypto equivalent to enable custom checks with
145    the peer and timeout settings
146   
147    @type defReadTimeout: M2Crypto.SSL.timeout
148    @cvar defReadTimeout: default timeout for read operations
149    @type defWriteTimeout: M2Crypto.SSL.timeout
150    @cvar defWriteTimeout: default timeout for write operations"""   
151    defReadTimeout = SSL.timeout(sec=20.)
152    defWriteTimeout = SSL.timeout(sec=20.)
153   
154    def __init__(self, *args, **kw):
155        '''Overload to enable setting of post connection check
156        callback to SSL.Connection
157       
158        type *args: tuple
159        param *args: args which apply to M2Crypto.httpslib.HTTPSConnection
160        type **kw: dict
161        param **kw: additional keywords
162        @type postConnectionCheck: SSL.Checker.Checker derivative
163        @keyword postConnectionCheck: set class for checking peer
164        @type readTimeout: M2Crypto.SSL.timeout
165        @keyword readTimeout: readTimeout - set timeout for read
166        @type writeTimeout: M2Crypto.SSL.timeout
167        @keyword writeTimeout: similar to read timeout'''
168       
169        if 'postConnectionCheck' in kw:
170            self._postConnectionCheck = kw['postConnectionCheck']
171            del kw['postConnectionCheck']
172        else:
173            self._postConnectionCheck = SSL.Checker.Checker
174       
175        if 'readTimeout' in kw:
176            if not isinstance(readTimeout, SSL.timeout):
177                raise AttributeError, "readTimeout must be of type " + \
178                                      "M2Crypto.SSL.timeout" 
179            self.readTimeout = readTimeout
180            del kw['readTimeout']
181        else:
182            self.readTimeout = HTTPSConnection.defReadTimeout
183             
184        if 'writeTimeout' in kw:
185            if not isinstance(writeTimeout, SSL.timeout):
186                raise AttributeError, "writeTimeout must be of type " + \
187                                      "M2Crypto.SSL.timeout" 
188            self.writeTimeout = writeTimeout
189            del kw['writeTimeout']
190        else:
191            self.writeTimeout = HTTPSConnection.defWriteTimeout
192           
193        _HTTPSConnection.__init__(self, *args, **kw)
194       
195       
196    def connect(self):
197        '''Overload M2Crypto.httpslib.HTTPSConnection to enable
198        custom post connection check of peer certificate and socket timeout'''
199        self.sock = SSL.Connection(self.ssl_ctx)
200        self.sock.set_post_connection_check_callback(
201                                                 self._postConnectionCheck)
202
203        self.sock.set_socket_read_timeout(self.readTimeout)
204        self.sock.set_socket_write_timeout(self.writeTimeout)
205
206        self.sock.connect((self.host, self.port))
Note: See TracBrowser for help on using the repository browser.