source: TI12-security/trunk/MyProxyWebService/myproxy/server/test/test_myproxywsgi_with_paster.py @ 6953

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/MyProxyWebService/myproxy/server/test/test_myproxywsgi_with_paster.py@6953
Revision 6953, 6.6 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 5: MyProxy? Logon HTTPS Interface

  • fix to test_myproxy_with_paster unit test module - added main test


Line 
1#!/usr/bin/env python
2"""Unit tests for MyProxy WSGI Middleware classes and Application testing them
3with Paster web application server.  The server is started from __init__ method
4of the Test Case class and then called by the unit test methods.  The unit
5test methods themselves using a bash script myproxy-ws-logon.sh to query the
6MyProxy web application.
7"""
8__author__ = "P J Kershaw"
9__date__ = "25/05/10"
10__copyright__ = "(C) 2010 Science and Technology Facilities Council"
11__license__ = "BSD - see LICENSE file in top-level directory"
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14from os import path
15from getpass import getpass
16from ConfigParser import SafeConfigParser, NoOptionError
17import subprocess
18import unittest
19import socket
20import logging
21logging.basicConfig(level=logging.DEBUG)
22
23from OpenSSL import SSL, crypto
24
25from myproxy.server.test import PasteDeployAppServer
26       
27
28class MyProxyLogonAppWithPasterTestCase(unittest.TestCase):
29    """Test MyProxy Logon App WSGI in Paster web application server container
30    with bash shell script clients.  For POSIX-like systems ONLY
31    """
32    THIS_DIR = path.dirname(__file__)
33    CA_DIRNAME = 'ca'
34    CA_DIR = path.join(THIS_DIR, CA_DIRNAME)
35    CA_ENV_VARNAME = 'X509_CERT_DIR'
36    INI_FILENAME = 'myproxywsgi.ini'
37    INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME) 
38    CONFIG_FILENAME = 'test_myproxywsgi.cfg'
39    CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME) 
40    SSLCERT_FILEPATH = 'localhost.crt'
41    SSLKEY_FILEPATH = 'localhost.key'
42
43    SERVICE_PORTNUM = 10443
44    LOGON_SCRIPT_CMD = 'myproxy-ws-logon.sh'
45    LOGON_SCRIPT_USER_OPTNAME = '--username'
46    LOGON_SCRIPT_STDIN_PASS_OPTNAME = '--stdin_pass'
47   
48    SCRIPT_URI_OPTNAME = '--uri'
49   
50    GET_TRUSTROOTS_SCRIPT_CMD = 'myproxy-ws-get-trustroots.sh'
51    GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME = '--bootstrap'
52   
53    def __init__(self, *arg, **kw):
54        """Read settings from a config file and create thread for paster
55        based MyProxy Web Service app running over HTTPS
56        """
57        super(MyProxyLogonAppWithPasterTestCase, self).__init__(*arg, **kw)
58        self.services = []
59        self.disableServiceStartup = False
60       
61        self.cfg = SafeConfigParser({'here': self.__class__.THIS_DIR})
62        self.cfg.optionxform = str
63        self.cfg.read(self.__class__.CONFIG_FILEPATH)
64       
65        # Start the MyProxy web service
66        self.addService(cfgFilePath=self.__class__.INI_FILEPATH, 
67                        port=self.__class__.SERVICE_PORTNUM,
68                        withSSL=True,
69                        withLoggingConfig=False)
70   
71    def test01GetTrustRootsScriptWithBootstrap(self):
72        # Test curl/base64 based client script
73        optName = 'MyProxyLogonAppWithPasterTestCase.test02GetTrustRootsScript'
74        uri = self.cfg.get(optName, 'uri')
75       
76        cmd = (
77            self.__class__.GET_TRUSTROOTS_SCRIPT_CMD, 
78            "%s=%s" % (self.__class__.SCRIPT_URI_OPTNAME, uri),
79            "%s" % self.__class__.GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME
80        )
81               
82        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
83                                stderr=subprocess.PIPE,
84                                env={self.__class__.CA_ENV_VARNAME:
85                                     self.__class__.CA_DIR})
86        stdoutdata, stderrdata = proc.communicate()
87        self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 
88                    stderrdata)
89        print("stdout = %s" % stdoutdata)
90   
91    def test02LogonScript(self):
92        # Test curl/openssl based client script access
93        optName = 'MyProxyLogonAppWithPasterTestCase.test02LogonScript'
94        username = self.cfg.get(optName, 'username')
95        try: 
96            password = self.cfg.get(optName, 'password')
97        except NoOptionError:
98            password = getpass(optName + ' password: ')
99
100        uri = self.cfg.get(optName, 'uri')
101       
102        cmd = (
103            self.__class__.LOGON_SCRIPT_CMD, 
104            "%s=%s"%(self.__class__.SCRIPT_URI_OPTNAME, uri),
105            "%s=%s"%(self.__class__.LOGON_SCRIPT_USER_OPTNAME, username),
106            self.__class__.LOGON_SCRIPT_STDIN_PASS_OPTNAME
107        )
108               
109        p1 = subprocess.Popen(["echo", password], stdout=subprocess.PIPE)
110        p2 = subprocess.Popen(cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
111                              stderr=subprocess.PIPE,
112                              env={self.__class__.CA_ENV_VARNAME:
113                                   self.__class__.CA_DIR})
114        stdoutdata, stderrdata = p2.communicate()
115        self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 
116                    stderrdata)
117        print("stdout = %s" % stdoutdata)
118       
119        cert = crypto.load_certificate(crypto.FILETYPE_PEM, stdoutdata)
120        subj = cert.get_subject()
121        self.assert_(subj)
122        self.assert_(subj.CN)
123        print("Returned certificate subject CN=%r" % subj.CN)
124       
125    def addService(self, *arg, **kw):
126        """Utility for setting up threads to run Paste HTTP based services with
127        unit tests
128       
129        @param arg: tuple contains ini file path setting for the service
130        @type arg: tuple
131        @param kw: keywords including "port" - port number to run the service
132        from
133        @type kw: dict
134        """
135        if self.disableServiceStartup:
136            return
137       
138        withSSL = kw.pop('withSSL', False)
139        if withSSL:
140            certFilePath = path.join(self.__class__.THIS_DIR, 
141                                     self.__class__.SSLCERT_FILEPATH)
142            priKeyFilePath = path.join(self.__class__.THIS_DIR, 
143                                       self.__class__.SSLKEY_FILEPATH)
144           
145            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
146            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
147       
148            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
149            kw['ssl_context'].use_certificate_file(certFilePath)
150           
151        try:
152            self.services.append(PasteDeployAppServer(*arg, **kw))
153            self.services[-1].startThread()
154           
155        except socket.error:
156            pass
157
158    def __del__(self):
159        """Stop any services started with the addService method"""
160        if hasattr(self, 'services'):
161            for service in self.services:
162                service.terminateThread()
163               
164        parentObj = super(MyProxyLogonAppWithPasterTestCase, self)
165        if hasattr(parentObj, '__del__'):
166            parentObj.__del__()
167
168if __name__ == "__main__":
169    unittest.main()       
170       
Note: See TracBrowser for help on using the repository browser.