source: TI12-security/trunk/MyProxyServerUtils/myproxy/server/test/test_myproxywsgi_with_paster.py @ 6897

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/MyProxyServerUtils/myproxy/server/test/test_myproxywsgi_with_paster.py@6897
Revision 6897, 5.0 KB checked in by pjkersha, 9 years ago (diff)

Fixed setting of authentication realm for HTTP Basic Auth middleware and improved interface to callback function by providing a exception type for the callback function to use to pass back message and HTTP status code.

Line 
1#!/usr/bin/env python
2"""Unit tests for MyProxy WSGI Middleware classes and Application testing them
3with Paster web application server.  The server is started from __init__ method
4of the Test Case class and then called by the unit test methods.  The unit
5test methods themselves using a bash script myproxy-ws-logon.sh to query the
6MyProxy web application.
7"""
8__author__ = "P J Kershaw"
9__date__ = "25/05/10"
10__copyright__ = "(C) 2010 Science and Technology Facilities Council"
11__license__ = "BSD - see LICENSE file in top-level directory"
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14from os import path
15from getpass import getpass
16from ConfigParser import SafeConfigParser, NoOptionError
17import subprocess
18import unittest
19import socket
20import logging
21logging.basicConfig(level=logging.DEBUG)
22
23from OpenSSL import SSL, crypto
24
25from myproxy.server.test import PasteDeployAppServer
26       
27
28class MyProxyLogonAppWithPasterTestCase(unittest.TestCase):
29    """Test MyProxy Logon App WSGI in Paster web application server container
30    """
31    THIS_DIR = path.dirname(__file__)
32    INI_FILENAME = 'myproxywsgi.ini'
33    INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME) 
34    CONFIG_FILENAME = 'test_myproxywsgi.cfg'
35    CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME) 
36    SSLCERT_FILEPATH = 'localhost.crt'
37    SSLKEY_FILEPATH = 'localhost.key'
38
39    SERVICE_PORTNUM = 10443
40    SCRIPT_CMD = 'myproxy-ws-logon.sh'
41    SCRIPT_URI_OPTNAME = '--uri'
42    SCRIPT_USER_OPTNAME = '--username'
43    SCRIPT_STDIN_PASS = '--stdin_pass'
44   
45    def __init__(self, *arg, **kw):
46        super(MyProxyLogonAppWithPasterTestCase, self).__init__(*arg, **kw)
47        self.services = []
48        self.disableServiceStartup = False
49       
50        self.cfg = SafeConfigParser({'here': self.__class__.THIS_DIR})
51        self.cfg.optionxform = str
52        self.cfg.read(self.__class__.CONFIG_FILEPATH)
53       
54        # Start the MyProxy web service
55        self.addService(cfgFilePath=self.__class__.INI_FILEPATH, 
56                        port=self.__class__.SERVICE_PORTNUM,
57                        withSSL=True,
58                        withLoggingConfig=False)
59   
60    def test01Script(self):
61        # Test wget/openssl based client script access
62        username = self.cfg.get('test01Logon', 'username')
63        try: 
64            password = self.cfg.get('test01Logon', 'password')
65        except NoOptionError:
66            password = getpass('test01Logon password: ')
67
68        uri = self.cfg.get('test01Logon', 'uri')
69       
70        cmd = (
71            self.__class__.SCRIPT_CMD, 
72            "%s=%s" % (self.__class__.SCRIPT_URI_OPTNAME, uri),
73            "%s=%s" % (self.__class__.SCRIPT_USER_OPTNAME, username),
74            self.__class__.SCRIPT_STDIN_PASS
75        )
76               
77        p1 = subprocess.Popen(["echo", password], stdout=subprocess.PIPE)
78        p2 = subprocess.Popen(cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
79                              stderr=subprocess.PIPE,
80                              env={'X509_CERT_DIR':self.__class__.THIS_DIR})
81        stdoutdata, stderrdata = p2.communicate()
82        self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 
83                    stderrdata)
84        print("stdout = %s" % stdoutdata)
85       
86        cert = crypto.load_certificate(crypto.FILETYPE_PEM, stdoutdata)
87        subj = cert.get_subject()
88        self.assert_(subj)
89        self.assert_(subj.CN)
90        print("Returned certificate subject CN=%r" % subj.CN)
91       
92    def addService(self, *arg, **kw):
93        """Utility for setting up threads to run Paste HTTP based services with
94        unit tests
95       
96        @param cfgFilePath: ini file containing configuration for the service
97        @type cfgFilePath: basestring
98        @param port: port number to run the service from
99        @type port: int
100        """
101        if self.disableServiceStartup:
102            return
103       
104        withSSL = kw.pop('withSSL', False)
105        if withSSL:
106            certFilePath = path.join(self.__class__.THIS_DIR, 
107                                     self.__class__.SSLCERT_FILEPATH)
108            priKeyFilePath = path.join(self.__class__.THIS_DIR, 
109                                       self.__class__.SSLKEY_FILEPATH)
110           
111            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
112            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
113       
114            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
115            kw['ssl_context'].use_certificate_file(certFilePath)
116           
117        try:
118            self.services.append(PasteDeployAppServer(*arg, **kw))
119            self.services[-1].startThread()
120           
121        except socket.error:
122            pass
123
124    def __del__(self):
125        """Stop any services started with the addService method"""
126        if hasattr(self, 'services'):
127            for service in self.services:
128                service.terminateThread()
129               
130        parentObj = super(MyProxyLogonAppWithPasterTestCase, self)
131        if hasattr(parentObj, '__del__'):
132            parentObj.__del__()
133       
Note: See TracBrowser for help on using the repository browser.