source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/myproxy/test_myproxy.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/myproxy/test_myproxy.py@7077
Revision 7077, 9.1 KB checked in by pjkersha, 9 years ago (diff)
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for MyProxy Client WSGI and MyProxy logon web service interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "14/10/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import os
16import urllib2
17import base64
18
19import paste.fixture
20from paste.deploy import loadapp
21
22from M2Crypto import X509, RSA, EVP, m2
23
24from ndg.security.test.unit import BaseTestCase
25from ndg.security.common.utils.configfileparsers import \
26    CaseSensitiveConfigParser
27from ndg.security.server.wsgi.myproxy import MyProxyClientMiddleware
28
29
30class TestAuthnApp(object):
31    '''Test Application for the Authentication handler to protect'''
32    response = "Test MyProxy Client WSGI"
33    URI_CLIENT_IS_IN_ENVIRON = '/test_clientIsInEnviron'
34    URI_LOGON_IS_IN_ENVIRON = '/test_logonIsInEnviron'
35    URI_200_OK = '/test_200'
36   
37    def __init__(self, app_conf, **local_conf):
38        pass
39       
40    def __call__(self, environ, start_response):
41        response = TestAuthnApp.response
42       
43        if environ['PATH_INFO'] == TestAuthnApp.URI_CLIENT_IS_IN_ENVIRON:
44            assert(environ[MyProxyClientMiddlewareTestCase.CLIENT_ENV_KEYNAME])
45           
46            response = ('Found MyProxyClient instance %r=%r' % 
47                (MyProxyClientMiddlewareTestCase.CLIENT_ENV_KEYNAME,
48                 environ[MyProxyClientMiddlewareTestCase.CLIENT_ENV_KEYNAME]))
49           
50            status = "200 OK"
51           
52        elif environ['PATH_INFO'] == TestAuthnApp.URI_LOGON_IS_IN_ENVIRON:
53            assert(environ[
54                    MyProxyClientMiddlewareTestCase.LOGON_FUNC_ENV_KEYNAME])
55           
56            response = ('Found MyProxyClient logon function %r=%r' % 
57                (MyProxyClientMiddlewareTestCase.LOGON_FUNC_ENV_KEYNAME,
58                 environ[
59                    MyProxyClientMiddlewareTestCase.LOGON_FUNC_ENV_KEYNAME]))
60           
61            status = "200 OK"
62           
63        elif environ['PATH_INFO'] == TestAuthnApp.URI_200_OK:
64            status = "200 OK"
65        else:
66            status = "404 Not found"
67               
68        start_response(status,
69                       [('Content-length', 
70                         str(len(response))),
71                        ('Content-type', 'text/plain')])
72        return [response]
73   
74   
75class MyProxyClientMiddlewareTestCase(BaseTestCase):
76    """Test MyProxy Client WSGI"""
77    CLIENT_ENV_KEYNAME = 'myProxyClient'
78    LOGON_FUNC_ENV_KEYNAME = 'logon'
79    SERVICE_PORTNUM = 20443
80    WGET_CMD = 'wget'
81    WGET_USER_OPTNAME = '--http-user'
82    WGET_PASSWD_OPTNAME = '--http-password'
83    WGET_OUTPUT_OPTNAME = '--output-document'
84    WGET_STDOUT = '-'
85   
86    def __init__(self, *args, **kwargs):
87        app = TestAuthnApp({})
88        self.wsgiapp = MyProxyClientMiddleware(app, {}, 
89    prefix='',
90    myProxyClientPrefix='client_',
91    clientEnvKeyName=MyProxyClientMiddlewareTestCase.CLIENT_ENV_KEYNAME,
92    logonFuncEnvKeyName=MyProxyClientMiddlewareTestCase.LOGON_FUNC_ENV_KEYNAME,
93    client_hostname='localhost')
94       
95        self.app = paste.fixture.TestApp(self.wsgiapp)
96         
97        BaseTestCase.__init__(self, *args, **kwargs)
98
99    def test01CheckForClientEnvKey(self):
100        response = self.app.get(TestAuthnApp.URI_CLIENT_IS_IN_ENVIRON)
101        print response
102
103    def test02CheckForLogonFuncEnvKey(self):
104        response = self.app.get(TestAuthnApp.URI_LOGON_IS_IN_ENVIRON)
105        print response
106   
107   
108class MyProxyLogonMiddlewareTestCase(BaseTestCase):
109    """Test MyProxy logon web service interface"""
110    SERVICE_PORTNUM = 20443
111    WGET_CMD = 'wget'
112    WGET_USER_OPTNAME = '--http-user'
113    WGET_PASSWD_OPTNAME = '--http-password'
114    WGET_OUTPUT_OPTNAME = '--output-document'
115    WGET_STDOUT = '-'
116    INI_FILENAME = 'test.ini'
117   
118    def __init__(self, *args, **kwargs):
119        here_dir = os.path.dirname(os.path.abspath(__file__))
120        iniFilePath = os.path.join(here_dir, 
121                                   MyProxyLogonMiddlewareTestCase.INI_FILENAME)
122        self.wsgiapp = loadapp('config:%s' % iniFilePath)
123        self.app = paste.fixture.TestApp(self.wsgiapp)
124       
125        cfg = CaseSensitiveConfigParser()
126        cfg.read(iniFilePath)
127        self.username = cfg.get('DEFAULT', 'username')
128        if cfg.has_option('DEFAULT', 'password'):
129            self.password = cfg.get('DEFAULT', 'password')
130        else:
131            self.password = None
132       
133        BaseTestCase.__init__(self, *args, **kwargs)
134
135        # Thread separate Paster based service
136        self.addService(app=self.wsgiapp, 
137                        port=MyProxyLogonMiddlewareTestCase.SERVICE_PORTNUM)
138
139    def test01PasteFixtureInvalidCredentials(self):
140        username = '_test'
141        password = 'test'
142       
143        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
144        authHeader =  "Basic %s" % base64String
145        headers = {'Authorization': authHeader}
146
147        url = TestAuthnApp.URI_200_OK
148       
149        response = self.app.get(url, headers=headers, status=401)
150        print response
151
152    def test02PasteFixture(self):
153        username = self.username
154        if self.password is None:
155            from getpass import getpass
156            password = getpass('test02PasteFixture: MyProxy pass-phrase for '
157                               '%r: ' % username)
158        else:
159            password = self.password
160       
161        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
162        authHeader =  "Basic %s" % base64String
163        headers = {'Authorization': authHeader}
164
165        url = TestAuthnApp.URI_200_OK
166       
167        response = self.app.get(url, headers=headers)
168        print response
169       
170    def test03Urllib2ClientGET(self):
171       
172        username = self.username
173        if self.password is None:
174            from getpass import getpass
175            password = getpass('test03Urllib2ClientGET: MyProxy pass-phrase '
176                               'for %r: ' % username)
177        else:
178            password = self.password
179
180        url = 'http://localhost:%d/test_200' % \
181            MyProxyLogonMiddlewareTestCase.SERVICE_PORTNUM
182           
183        req = urllib2.Request(url)
184        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
185        authHeader =  "Basic %s" % base64String
186        req.add_header("Authorization", authHeader)
187       
188        handle = urllib2.urlopen(req)
189       
190        response = handle.read()
191        print (response)
192       
193    def test04Urllib2ClientPOST(self):
194       
195        username = self.username
196        if self.password is None:
197            from getpass import getpass
198            password = getpass('test04Urllib2ClientPOST: MyProxy pass-phrase '
199                               'for %r: ' % username)
200        else:
201            password = self.password
202
203        url = 'http://localhost:%d/test_200' % \
204            MyProxyLogonMiddlewareTestCase.SERVICE_PORTNUM
205           
206        req = urllib2.Request(url)
207        base64String = base64.encodestring('%s:%s' % (username, password))[:-1]
208        authHeader =  "Basic %s" % base64String
209        req.add_header("Authorization", authHeader)
210       
211        # Create key pair
212        nBitsForKey = 2048
213        keys = RSA.gen_key(nBitsForKey, m2.RSA_F4)
214        certReq = X509.Request()
215       
216        # Create public key object
217        pubKey = EVP.PKey()
218        pubKey.assign_rsa(keys)
219       
220        # Add the public key to the request
221        certReq.set_version(0)
222        certReq.set_pubkey(pubKey)
223       
224        x509Name = X509.X509_Name()                       
225        certReq.set_subject_name(x509Name)
226       
227        certReq.sign(pubKey, "md5")
228
229        pemCertReq = certReq.as_pem()
230       
231        handle = urllib2.urlopen(req, data=pemCertReq)
232       
233        response = handle.read()
234        print (response)
235       
236    def test05WGetClient(self):
237        uri = ('http://localhost:%d/test_200' % 
238                  MyProxyLogonMiddlewareTestCase.SERVICE_PORTNUM)
239                 
240        username = self.username
241        if self.password is None:
242            from getpass import getpass
243            password = getpass('test04WGetClient: MyProxy pass-phrase for '
244                               '%r: ' % username)
245        else:
246            password = self.password
247
248        import os
249        import subprocess
250        cmd = "%s %s %s=%s %s=%s %s=%s" % (
251            MyProxyLogonMiddlewareTestCase.WGET_CMD, 
252            uri,
253            MyProxyLogonMiddlewareTestCase.WGET_USER_OPTNAME,
254            username,
255            MyProxyLogonMiddlewareTestCase.WGET_PASSWD_OPTNAME,
256            password,
257            MyProxyLogonMiddlewareTestCase.WGET_OUTPUT_OPTNAME,
258            MyProxyLogonMiddlewareTestCase.WGET_STDOUT)
259       
260        p = subprocess.Popen(cmd, shell=True)
261        status = os.waitpid(p.pid, 0)
262        self.failIf(status[-1] != 0, "Expecting 0 exit status for %r" % cmd)
263
264
265if __name__ == "__main__":
266    #import sys;sys.argv = ['', 'Test.testName']
267    import unittest
268    unittest.main()
Note: See TracBrowser for help on using the repository browser.