source: TI12-security/trunk/MyProxyWebService/myproxy/server/test/test_myproxywsgi_with_paster.py @ 6945

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/MyProxyWebService/myproxy/server/test/test_myproxywsgi_with_paster.py@6945
Revision 6945, 6.3 KB checked in by pjkersha, 9 years ago (diff)

Incomplete - task 5: MyProxy? Logon HTTPS Interface

  • working unit tests with shell script clients called from the unit tests
Line 
1#!/usr/bin/env python
2"""Unit tests for MyProxy WSGI Middleware classes and Application testing them
3with Paster web application server.  The server is started from __init__ method
4of the Test Case class and then called by the unit test methods.  The unit
5test methods themselves using a bash script myproxy-ws-logon.sh to query the
6MyProxy web application.
7"""
8__author__ = "P J Kershaw"
9__date__ = "25/05/10"
10__copyright__ = "(C) 2010 Science and Technology Facilities Council"
11__license__ = "BSD - see LICENSE file in top-level directory"
12__contact__ = "Philip.Kershaw@stfc.ac.uk"
13__revision__ = '$Id$'
14from os import path
15from getpass import getpass
16from ConfigParser import SafeConfigParser, NoOptionError
17import subprocess
18import unittest
19import socket
20import logging
21logging.basicConfig(level=logging.DEBUG)
22
23from OpenSSL import SSL, crypto
24
25from myproxy.server.test import PasteDeployAppServer
26       
27
28class MyProxyLogonAppWithPasterTestCase(unittest.TestCase):
29    """Test MyProxy Logon App WSGI in Paster web application server container
30    """
31    THIS_DIR = path.dirname(__file__)
32    CA_DIRNAME = 'ca'
33    CA_DIR = path.join(THIS_DIR, CA_DIRNAME)
34    CA_ENV_VARNAME = 'X509_CERT_DIR'
35    INI_FILENAME = 'myproxywsgi.ini'
36    INI_FILEPATH = path.join(THIS_DIR, INI_FILENAME) 
37    CONFIG_FILENAME = 'test_myproxywsgi.cfg'
38    CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME) 
39    SSLCERT_FILEPATH = 'localhost.crt'
40    SSLKEY_FILEPATH = 'localhost.key'
41
42    SERVICE_PORTNUM = 10443
43    LOGON_SCRIPT_CMD = 'myproxy-ws-logon.sh'
44    LOGON_SCRIPT_USER_OPTNAME = '--username'
45    LOGON_SCRIPT_STDIN_PASS_OPTNAME = '--stdin_pass'
46   
47    SCRIPT_URI_OPTNAME = '--uri'
48   
49    GET_TRUSTROOTS_SCRIPT_CMD = 'myproxy-ws-get-trustroots.sh'
50    GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME = '--bootstrap'
51   
52    def __init__(self, *arg, **kw):
53        super(MyProxyLogonAppWithPasterTestCase, self).__init__(*arg, **kw)
54        self.services = []
55        self.disableServiceStartup = False
56       
57        self.cfg = SafeConfigParser({'here': self.__class__.THIS_DIR})
58        self.cfg.optionxform = str
59        self.cfg.read(self.__class__.CONFIG_FILEPATH)
60       
61        # Start the MyProxy web service
62        self.addService(cfgFilePath=self.__class__.INI_FILEPATH, 
63                        port=self.__class__.SERVICE_PORTNUM,
64                        withSSL=True,
65                        withLoggingConfig=False)
66   
67    def test01GetTrustRootsScriptWithBootstrap(self):
68        # Test curl/base64 based client script
69        optName = 'MyProxyLogonAppWithPasterTestCase.test02GetTrustRootsScript'
70        uri = self.cfg.get(optName, 'uri')
71       
72        cmd = (
73            self.__class__.GET_TRUSTROOTS_SCRIPT_CMD, 
74            "%s=%s" % (self.__class__.SCRIPT_URI_OPTNAME, uri),
75            "%s" % self.__class__.GET_TRUSTROOTS_SCRIPT_BOOTSTRAP_OPTNAME
76        )
77               
78        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
79                                stderr=subprocess.PIPE,
80                                env={self.__class__.CA_ENV_VARNAME:
81                                     self.__class__.CA_DIR})
82        stdoutdata, stderrdata = proc.communicate()
83        self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 
84                    stderrdata)
85        print("stdout = %s" % stdoutdata)
86   
87    def test02LogonScript(self):
88        # Test curl/openssl based client script access
89        optName = 'MyProxyLogonAppWithPasterTestCase.test02LogonScript'
90        username = self.cfg.get(optName, 'username')
91        try: 
92            password = self.cfg.get(optName, 'password')
93        except NoOptionError:
94            password = getpass(optName + ' password: ')
95
96        uri = self.cfg.get(optName, 'uri')
97       
98        cmd = (
99            self.__class__.LOGON_SCRIPT_CMD, 
100            "%s=%s"%(self.__class__.SCRIPT_URI_OPTNAME, uri),
101            "%s=%s"%(self.__class__.LOGON_SCRIPT_USER_OPTNAME, username),
102            self.__class__.LOGON_SCRIPT_STDIN_PASS_OPTNAME
103        )
104               
105        p1 = subprocess.Popen(["echo", password], stdout=subprocess.PIPE)
106        p2 = subprocess.Popen(cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
107                              stderr=subprocess.PIPE,
108                              env={self.__class__.CA_ENV_VARNAME:
109                                   self.__class__.CA_DIR})
110        stdoutdata, stderrdata = p2.communicate()
111        self.failIf(len(stderrdata) > 0, "An error message was returned: %s" % 
112                    stderrdata)
113        print("stdout = %s" % stdoutdata)
114       
115        cert = crypto.load_certificate(crypto.FILETYPE_PEM, stdoutdata)
116        subj = cert.get_subject()
117        self.assert_(subj)
118        self.assert_(subj.CN)
119        print("Returned certificate subject CN=%r" % subj.CN)
120       
121    def addService(self, *arg, **kw):
122        """Utility for setting up threads to run Paste HTTP based services with
123        unit tests
124       
125        @param cfgFilePath: ini file containing configuration for the service
126        @type cfgFilePath: basestring
127        @param port: port number to run the service from
128        @type port: int
129        """
130        if self.disableServiceStartup:
131            return
132       
133        withSSL = kw.pop('withSSL', False)
134        if withSSL:
135            certFilePath = path.join(self.__class__.THIS_DIR, 
136                                     self.__class__.SSLCERT_FILEPATH)
137            priKeyFilePath = path.join(self.__class__.THIS_DIR, 
138                                       self.__class__.SSLKEY_FILEPATH)
139           
140            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
141            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
142       
143            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
144            kw['ssl_context'].use_certificate_file(certFilePath)
145           
146        try:
147            self.services.append(PasteDeployAppServer(*arg, **kw))
148            self.services[-1].startThread()
149           
150        except socket.error:
151            pass
152
153    def __del__(self):
154        """Stop any services started with the addService method"""
155        if hasattr(self, 'services'):
156            for service in self.services:
157                service.terminateThread()
158               
159        parentObj = super(MyProxyLogonAppWithPasterTestCase, self)
160        if hasattr(parentObj, '__del__'):
161            parentObj.__del__()
162       
Note: See TracBrowser for help on using the repository browser.