source: TI12-security/trunk/MyProxyServerUtils/myproxy/server/test/test_myproxywsgi.py @ 6897

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/MyProxyServerUtils/myproxy/server/test/test_myproxywsgi.py@6897
Revision 6897, 5.7 KB checked in by pjkersha, 10 years ago (diff)

Fixed setting of authentication realm for HTTP Basic Auth middleware and improved interface to callback function by providing a exception type for the callback function to use to pass back message and HTTP status code.

Line 
1#!/usr/bin/env python
2"""Unit tests for MyProxy WSGI Middleware classes and Application.  These are
3run using paste.fixture i.e. tests stubs to a web application server
4"""
5__author__ = "P J Kershaw"
6__date__ = "21/05/10"
7__copyright__ = "(C) 2010 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13
14import unittest
15import os
16import base64
17from getpass import getpass
18from ConfigParser import SafeConfigParser, NoOptionError
19
20from OpenSSL import crypto
21import paste.fixture
22from paste.deploy import loadapp
23
24from myproxy.server.wsgi.middleware import MyProxyClientMiddleware
25
26
27class TestMyProxyClientMiddlewareApp(object):
28    '''Test Application for MyClientProxyMiddleware'''
29    RESPONSE = "Test MyProxyClientMiddleware"
30   
31    def __call__(self, environ, start_response):
32       
33        assert(environ[MyProxyClientMiddleware.CLIENT_ENV_KEYNAME])
34        assert(environ[MyProxyClientMiddleware.LOGON_FUNC_ENV_KEYNAME])
35        status = "200 OK"
36               
37        start_response(status,
38                       [('Content-length', 
39                         str(len(self.__class__.RESPONSE))),
40                        ('Content-type', 'text/plain')])
41        return [self.__class__.RESPONSE]
42
43
44class MyProxyClientMiddlewareTestCase(unittest.TestCase):
45    def __init__(self, *args, **kwargs):
46        app = TestMyProxyClientMiddlewareApp()
47        app = MyProxyClientMiddleware.filter_app_factory(app, {}, prefix='')
48        self.app = paste.fixture.TestApp(app)
49         
50        unittest.TestCase.__init__(self, *args, **kwargs)
51
52    def test01AssertMyProxyClientInEnviron(self):
53        # Check the middleware has set the MyProxy client object in environ
54        response = self.app.get('/', status=200)
55        self.assert_(response)
56       
57       
58class MyProxyLogonAppTestCase(unittest.TestCase):
59    INI_FILENAME = 'myproxywsgi.ini'
60    THIS_DIR = os.path.dirname(__file__)
61    CONFIG_FILENAME = 'test_myproxywsgi.cfg'
62    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
63   
64    def __init__(self, *args, **kwargs):
65        here_dir = os.path.dirname(os.path.abspath(__file__))
66        wsgiapp = loadapp('config:' + MyProxyLogonAppTestCase.INI_FILENAME, 
67                          relative_to=here_dir)
68        self.app = paste.fixture.TestApp(wsgiapp)
69       
70        self.cfg = SafeConfigParser({'here': MyProxyLogonAppTestCase.THIS_DIR})
71        self.cfg.optionxform = str
72        self.cfg.read(MyProxyLogonAppTestCase.CONFIG_FILEPATH)
73       
74        unittest.TestCase.__init__(self, *args, **kwargs)
75       
76    def _createRequestCreds(self):
77        keyPair = crypto.PKey()
78        keyPair.generate_key(crypto.TYPE_RSA, 1024)
79       
80        certReq = crypto.X509Req()
81       
82        # Create public key object
83        certReq.set_pubkey(keyPair)
84       
85        # Add the public key to the request
86        certReq.sign(keyPair, "md5")
87       
88        pemCertReq = crypto.dump_certificate_request(crypto.FILETYPE_PEM, 
89                                                     certReq)
90        return keyPair, pemCertReq
91       
92    def test01Logon(self):
93        # Test successful logon
94        username = self.cfg.get('test01Logon', 'username')
95        try: 
96            password = self.cfg.get('test01Logon', 'password')
97        except NoOptionError:
98            password = getpass('test01Logon password: ')
99           
100        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
101        authHeader =  "Basic %s" % base64String
102        headers = {'Authorization': authHeader}
103       
104        # Create key pair and certificate request
105        keyPair, certReq = self._createRequestCreds()
106        response = self.app.post('/logon', certReq, headers=headers, status=200)
107        print response
108        self.assert_(response)
109       
110    def test02NoAuthorisationHeaderSet(self):   
111        # Test failure with omission of HTTP Basic Auth header - a 401 result is
112        # expected.
113             
114        # Create key pair and certificate request
115        keyPair, certReq = self._createRequestCreds()
116        response = self.app.post('/logon', certReq, status=401)
117        print response
118        self.assert_(response) 
119       
120    def test03NoCertificateRequestSent(self):
121        # Test with missing certificate request
122       
123        username = self.cfg.get('test01Logon', 'username')
124        try: 
125            password = self.cfg.get('test01Logon', 'password')
126        except NoOptionError:
127            password = getpass('test01Logon password: ')
128           
129        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
130        authHeader =  "Basic %s" % base64String
131        headers = {'Authorization': authHeader}
132       
133        # Bad POST'ed content
134        response = self.app.post('/logon', 'x', headers=headers, status=400)
135        print response
136        self.assert_(response)
137       
138    def test04GET(self):
139        # Test HTTP GET request - should be rejected - POST is expected
140       
141        username = self.cfg.get('test01Logon', 'username')
142        try: 
143            password = self.cfg.get('test01Logon', 'password')
144        except NoOptionError:
145            password = getpass('test01Logon password: ')
146           
147        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
148        authHeader =  "Basic %s" % base64String
149        headers = {'Authorization': authHeader}
150       
151        response = self.app.get('/logon', headers=headers, status=405)
152        print response
153        self.assert_(response)               
154
155if __name__ == "__main__":
156    unittest.main()       
Note: See TracBrowser for help on using the repository browser.