source: TI12-security/trunk/python/Tests/m2Crypto/test_m2urllib2.py @ 5372

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/m2Crypto/test_m2urllib2.py@5372
Revision 5372, 7.7 KB checked in by pjkersha, 11 years ago (diff)

Added code for validation of OpenID Provider by Relying Party using M2Crypto.m2urllib2 for SSL peer authN

Line 
1from openid.fetchers import USER_AGENT, _allowedURL, Urllib2Fetcher
2import urllib2
3from M2Crypto.m2urllib2 import HTTPSHandler
4from M2Crypto import SSL
5from M2Crypto.X509 import X509_Store_Context
6
7class M2Urllib2Fetcher(Urllib2Fetcher):
8    """M2Crypto urllib2 based fetcher to enable SSL based authentication of
9    peer
10    """
11
12    def fetch(self, url, body=None, headers=None):
13       
14        # Use parent class behaviour for non-SSL connections
15        if url.startswith('http://'):
16            return super(M2Urllib2Fetcher, self).fetch(url, 
17                                                       body=body, 
18                                                       headers=headers)
19           
20        if not _allowedURL(url):
21            raise ValueError('Bad URL scheme: %r' % (url,))
22
23        if headers is None:
24            headers = {}
25
26        headers.setdefault(
27            'User-Agent',
28            "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))
29
30        req = urllib2.Request(url, data=body, headers=headers)
31        try:
32            def verifyCallback(arg, x509StoreCtx):
33                '''@type x509StoreCtx: M2Crypto.X509_Store_Context
34                '''
35                x509Cert = x509StoreCtx.get_current_cert()
36                x509Cert.get_subject()
37                x509CertChain = x509StoreCtx.get1_chain()
38                for cert in x509CertChain:
39                    subject = cert.get_subject()
40                    dn = subject.as_text()
41                    print dn
42                   
43                return True
44               
45            ctx = SSL.Context()
46            caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt'
47#            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
48            ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert, 
49                           9, 
50                           callback=verifyCallback)
51            ctx.load_verify_locations(cafile=caFilePath)
52#            ctx.load_cert(certfile,
53#                          keyfile=None,
54#                          callback=util.passphrase_callback)
55            httpsHandler = HTTPSHandler(ssl_context=ctx)
56            f = httpsHandler.https_open(req)
57            try:
58                return self._makeResponse(f)
59            finally:
60                f.close()
61        except urllib2.HTTPError, why:
62            try:
63                return self._makeResponse(why)
64            finally:
65                why.close()
66
67        return resp
68
69class M2Urllib2Fetcher2(Urllib2Fetcher):
70    """M2Crypto urllib2 based fetcher to enable SSL based authentication of
71    peer
72    """
73
74    def fetch(self, url, body=None, headers=None):
75       
76        # Use parent class behaviour for non-SSL connections
77        if url.startswith('http://'):
78            return super(M2Urllib2Fetcher, self).fetch(url, 
79                                                       body=body, 
80                                                       headers=headers)
81           
82        if not _allowedURL(url):
83            raise ValueError('Bad URL scheme: %r' % (url,))
84
85        if headers is None:
86            headers = {}
87
88        headers.setdefault(
89            'User-Agent',
90            "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))
91
92        req = urllib2.Request(url, data=body, headers=headers)
93        try:
94            def verifyCallback(arg, x509StoreCtx):
95                '''@type x509StoreCtx: M2Crypto.X509_Store_Context
96                '''
97                x509Cert = x509StoreCtx.get_current_cert()
98                x509Cert.get_subject()
99                x509CertChain = x509StoreCtx.get1_chain()
100                for cert in x509CertChain:
101                    subject = cert.get_subject()
102                    dn = subject.as_text()
103                    print dn
104                   
105                return True
106               
107            ctx = SSL.Context()
108            caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt'
109#            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
110            ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert, 
111                           9, 
112                           callback=verifyCallback)
113            ctx.load_verify_locations(cafile=caFilePath)
114#            ctx.load_cert(certfile,
115#                          keyfile=None,
116#                          callback=util.passphrase_callback)
117            httpsHandler = HTTPSHandler(ssl_context=ctx)
118            f = httpsHandler.https_open(req)
119            try:
120                return self._makeResponse(f)
121            finally:
122                f.close()
123        except urllib2.HTTPError, why:
124            try:
125                return self._makeResponse(why)
126            finally:
127                why.close()
128
129        return resp
130
131def installOpener():
132    def verifyCallback(preVerifyOK, x509StoreCtx):
133        '''callback function used to control the behaviour when the
134        SSL_VERIFY_PEER flag is set
135       
136        http://www.openssl.org/docs/ssl/SSL_CTX_set_verify.html
137       
138        @type preVerifyOK: int
139        @param preVerifyOK: If a verification error is found, this parameter
140        will be set to 0
141        @type x509StoreCtx: M2Crypto.X509_Store_Context
142        @param x509StoreCtx: locate the certificate to be verified and perform
143        additional verification steps as needed
144        @rtype: int
145        @return: controls the strategy of the further verification process.
146        - If verify_callback returns 0, the verification process is immediately
147        stopped with "verification failed" state. If SSL_VERIFY_PEER is set,
148        a verification failure alert is sent to the peer and the TLS/SSL
149        handshake is terminated.
150        - If verify_callback returns 1, the verification process is continued.
151        If verify_callback always returns 1, the TLS/SSL handshake will not be
152        terminated with respect to verification failures and the connection
153        will be established. The calling process can however retrieve the error
154        code of the last verification error using SSL_get_verify_result(3) or
155        by maintaining its own error storage managed by verify_callback.
156        '''
157        if preVerifyOK == 0:
158            # Something is wrong with the certificate don't bother proceeding
159            # any further
160            return preVerifyOK
161       
162        x509Cert = x509StoreCtx.get_current_cert()
163        x509Cert.get_subject()
164        x509CertChain = x509StoreCtx.get1_chain()
165        for cert in x509CertChain:
166            subject = cert.get_subject()
167            dn = subject.as_text()
168            print dn
169           
170        # If all is OK preVerifyOK will be 1.  Return this to the caller to
171        # that it's OK to proceed
172        return preVerifyOK
173       
174    ctx = SSL.Context()
175#    caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt'
176    caDirPath = '/home/pjkersha/workspace/ndg.security.python/ndg.security.test/ndg/security/test/config/ca'
177    ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert, 
178                   9, 
179                   callback=verifyCallback)
180#    ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert, 1)
181
182    ctx.load_verify_locations(capath=caDirPath)
183#    ctx.load_cert(certfile,
184#                  keyfile=None,
185#                  callback=util.passphrase_callback)
186
187    from M2Crypto.m2urllib2 import build_opener
188    urllib2.install_opener(build_opener(ssl_context=ctx))
189   
190if __name__ == "__main__":
191    installOpener()
192    fetcher = Urllib2Fetcher()
193#    fetcher = M2Urllib2Fetcher()
194#    resp = fetcher.fetch('https://ndg3beta.badc.rl.ac.uk/openid')
195    resp = fetcher.fetch('https://localhost/openid')
196    print resp.body
Note: See TracBrowser for help on using the repository browser.