Changeset 5373


Ignore:
Timestamp:
10/06/09 09:53:04 (10 years ago)
Author:
pjkersha
Message:

Example m2urllib2 code prepared for use with OpenID Relying Party SSL peer authN of Provider

  • Cleaned up code to leave definitive test version
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/Tests/m2Crypto/test_m2urllib2.py

    r5372 r5373  
    44from M2Crypto import SSL 
    55from M2Crypto.X509 import X509_Store_Context 
    6  
    7 class M2Urllib2Fetcher(Urllib2Fetcher): 
    8     """M2Crypto urllib2 based fetcher to enable SSL based authentication of  
    9     peer 
    10     """ 
    11  
    12     def fetch(self, url, body=None, headers=None): 
    13          
    14         # Use parent class behaviour for non-SSL connections 
    15         if url.startswith('http://'): 
    16             return super(M2Urllib2Fetcher, self).fetch(url,  
    17                                                        body=body,  
    18                                                        headers=headers) 
    19              
    20         if not _allowedURL(url): 
    21             raise ValueError('Bad URL scheme: %r' % (url,)) 
    22  
    23         if headers is None: 
    24             headers = {} 
    25  
    26         headers.setdefault( 
    27             'User-Agent', 
    28             "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,)) 
    29  
    30         req = urllib2.Request(url, data=body, headers=headers) 
    31         try: 
    32             def verifyCallback(arg, x509StoreCtx): 
    33                 '''@type x509StoreCtx: M2Crypto.X509_Store_Context 
    34                 ''' 
    35                 x509Cert = x509StoreCtx.get_current_cert() 
    36                 x509Cert.get_subject() 
    37                 x509CertChain = x509StoreCtx.get1_chain() 
    38                 for cert in x509CertChain: 
    39                     subject = cert.get_subject() 
    40                     dn = subject.as_text() 
    41                     print dn 
    42                      
    43                 return True 
    44                  
    45             ctx = SSL.Context() 
    46             caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt' 
    47 #            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) 
    48             ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert,  
    49                            9,  
    50                            callback=verifyCallback) 
    51             ctx.load_verify_locations(cafile=caFilePath) 
    52 #            ctx.load_cert(certfile,  
    53 #                          keyfile=None,  
    54 #                          callback=util.passphrase_callback) 
    55             httpsHandler = HTTPSHandler(ssl_context=ctx) 
    56             f = httpsHandler.https_open(req) 
    57             try: 
    58                 return self._makeResponse(f) 
    59             finally: 
    60                 f.close() 
    61         except urllib2.HTTPError, why: 
    62             try: 
    63                 return self._makeResponse(why) 
    64             finally: 
    65                 why.close() 
    66  
    67         return resp 
    68  
    69 class M2Urllib2Fetcher2(Urllib2Fetcher): 
    70     """M2Crypto urllib2 based fetcher to enable SSL based authentication of  
    71     peer 
    72     """ 
    73  
    74     def fetch(self, url, body=None, headers=None): 
    75          
    76         # Use parent class behaviour for non-SSL connections 
    77         if url.startswith('http://'): 
    78             return super(M2Urllib2Fetcher, self).fetch(url,  
    79                                                        body=body,  
    80                                                        headers=headers) 
    81              
    82         if not _allowedURL(url): 
    83             raise ValueError('Bad URL scheme: %r' % (url,)) 
    84  
    85         if headers is None: 
    86             headers = {} 
    87  
    88         headers.setdefault( 
    89             'User-Agent', 
    90             "%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,)) 
    91  
    92         req = urllib2.Request(url, data=body, headers=headers) 
    93         try: 
    94             def verifyCallback(arg, x509StoreCtx): 
    95                 '''@type x509StoreCtx: M2Crypto.X509_Store_Context 
    96                 ''' 
    97                 x509Cert = x509StoreCtx.get_current_cert() 
    98                 x509Cert.get_subject() 
    99                 x509CertChain = x509StoreCtx.get1_chain() 
    100                 for cert in x509CertChain: 
    101                     subject = cert.get_subject() 
    102                     dn = subject.as_text() 
    103                     print dn 
    104                      
    105                 return True 
    106                  
    107             ctx = SSL.Context() 
    108             caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt' 
    109 #            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) 
    110             ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert,  
    111                            9,  
    112                            callback=verifyCallback) 
    113             ctx.load_verify_locations(cafile=caFilePath) 
    114 #            ctx.load_cert(certfile,  
    115 #                          keyfile=None,  
    116 #                          callback=util.passphrase_callback) 
    117             httpsHandler = HTTPSHandler(ssl_context=ctx) 
    118             f = httpsHandler.https_open(req) 
    119             try: 
    120                 return self._makeResponse(f) 
    121             finally: 
    122                 f.close() 
    123         except urllib2.HTTPError, why: 
    124             try: 
    125                 return self._makeResponse(why) 
    126             finally: 
    127                 why.close() 
    128  
    129         return resp 
    1306 
    1317def installOpener(): 
     
    17349         
    17450    ctx = SSL.Context() 
    175 #    caFilePath = '/home/pjkersha/Documents/BADC/Certificates/Cybertrust/cybertrustCombo.crt' 
    176     caDirPath = '/home/pjkersha/workspace/ndg.security.python/ndg.security.test/ndg/security/test/config/ca' 
     51 
     52    caDirPath = '../ndg.security.test/ndg/security/test/config/ca' 
    17753    ctx.set_verify(SSL.verify_peer|SSL.verify_fail_if_no_peer_cert,  
    17854                   9,  
     
    18157 
    18258    ctx.load_verify_locations(capath=caDirPath) 
    183 #    ctx.load_cert(certfile,  
    184 #                  keyfile=None,  
    185 #                  callback=util.passphrase_callback) 
     59#    ctx.load_cert(certFilePath,  
     60#                  keyfile=priKeyFilePath,  
     61#                  callback=lambda *arg, **kw: priKeyPwd) 
    18662 
    18763    from M2Crypto.m2urllib2 import build_opener 
     
    19167    installOpener() 
    19268    fetcher = Urllib2Fetcher() 
    193 #    fetcher = M2Urllib2Fetcher() 
    194 #    resp = fetcher.fetch('https://ndg3beta.badc.rl.ac.uk/openid') 
    19569    resp = fetcher.fetch('https://localhost/openid') 
    19670    print resp.body 
Note: See TracChangeset for help on using the changeset viewer.