source: TI05-delivery/trunk/test/test_embedded.py @ 1104

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1104
Revision 1104, 7.8 KB checked in by spascoe, 14 years ago (diff)

Enhanced test cases.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, syslog
11from glob import glob
12import re, tempfile
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BUILDDIR = glob('%s/build/lib.*' % HOME)[0]
16BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
17DATADIR = '%s/test/data' % HOME
18VERSION = open('%s/VERSION' % HOME).read()
19NDG_MESSAGE_LEN = 256
20
21NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
22
23sys.path.append(BUILDDIR)
24import bbftpd
25       
26class AuthzContext:
27    def __init__(self, version_msg, user):
28        self.version = version_msg
29        self.username = user
30
31class AuthContext:
32    def authorise(self):
33        # Read the auth version message
34        msg = bbftpd.recv()
35        # Trim to first '\0'
36        x = msg.find('\0')
37        if x:
38            msg = msg[:x]
39
40        syslog.syslog(syslog.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
41
42        # Send the response
43        msg = NDG_HANDSHAKE + '\0' * (NDG_MESSAGE_LEN - len(NDG_HANDSHAKE))
44        bbftpd.send(msg)
45
46        # Get privatestr
47        privatestr = bbftpd.recv()
48        # Trim to first '\0'
49        x = privatestr.find('\0')
50        if x:
51            privatestr = privatestr[:x]
52        syslog.syslog(syslog.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
53
54        return AuthzContext(msg, "TestCaseUser")
55
56
57class FailingAuthContext(AuthContext):
58    def authorise(self):
59        az = super(FailingAuthContext).authorise()
60        return None
61
62
63
64
65class BaseFixture(unittest.TestCase):
66    def setUp(self):
67        # We want to mark the beginning of this test case in syslog
68        syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0)
69        syslog.syslog(syslog.LOG_DEBUG, 'Starting EmbeddedServerTestCase')
70
71        self._startServer()
72
73    def tearDown(self):
74        self._stopServer()
75        syslog.syslog(syslog.LOG_DEBUG, 'Ended EmbeddedServerTestCase')
76        syslog.closelog()
77
78    #----------------------------------------------------------------------------------
79
80
81    def _startServer(self, authContext=None):
82        if not authContext:
83            authContext = AuthContext()
84        # Start the server and store it's PID
85        self.pid = bbftpd.run(authContext, ['-l', 'DEBUG'])
86
87    def _stopServer(self):
88        # Stop the server process
89        os.kill(self.pid, signal.SIGTERM)
90        os.waitpid(self.pid, 0)
91        syslog.syslog(syslog.LOG_DEBUG, 'Stopping server')
92
93
94    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
95        """Run the client.
96        """
97
98        if debug:
99            f = "-d"
100        else:
101            f = "-m"
102
103        if privatestr == None:
104            p = ""
105        else:
106            p = "-P %s" % repr(privatestr)
107
108        # This is ugly but I need to redirect stderr to stdout
109        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
110        return fh
111
112    def _readSyslog(self, logfile="/var/log/bbftpd/bbftpd.log"):
113        """Get all bbftpd messages from syslog for this test case.
114
115        @note: This requires read access to the logfile
116        """
117
118        # Read the logfile into a buffer
119        log = open(logfile).readlines()
120
121        # Find the latest testcase marker
122        start_i = 0
123        for i in xrange(len(log)-1, -1, -1):
124            if re.search('test_embedded.py: Starting EmbeddedServerTestCase', log[i]):
125                start_i = i+1
126                break
127        if not start_i:
128            raise RuntimeError, "Can't find test case entry in syslog"
129
130        filtered_log = []
131        for line in log[start_i:]:
132            if re.search(r'test_bbftpd.py:|bbftpd .*:', line):
133                filtered_log.append(line)
134
135        return ''.join(filtered_log)
136
137
138    def _findLines(self, lines, string):
139        """Look for lines matching each regular expression in lines.
140       
141        @return: True if all lines are found, else False
142        """
143       
144        for line in lines:
145            if not re.search('^%s$' % line, string, re.M):
146                return False
147           
148        return True
149
150
151    def assertLines(self, lines, string):
152        """Assert that there is a line in string matching each regular expression in lines.
153        """
154
155        for line in lines:
156            self.assert_(re.search('^%s$' % line, string, re.M))
157
158
159
160class EmbeddedServerTestCase(BaseFixture):
161    """Test the bbftpd module.
162    """
163   
164    def testStartup(self):
165        lines = self._readSyslog()
166        # Give syslog time to flush it's logs.
167        time.sleep(1)
168        self.assertLines(['.*Starting bbftpd'], lines)
169
170    def testDir(self):
171        """Try connecting the client and listing a directory.
172        """
173
174
175        fh = self._runClient("dir %s" % DATADIR)
176        output = fh.read()
177
178
179        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
180                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
181
182        lines = self._readSyslog()
183        self.assertLines(['.*Getting new bbftp connexion.*',
184                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
185                          r'.*User TestCaseUser disconnected.*'], lines)
186
187
188    def testHandshake(self):
189        """Verify handshake messages are exchanged.
190        """
191
192        fh = self._runClient("dir .", debug=True)
193        output = fh.read()
194
195        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
196
197        lines = self._readSyslog()
198        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
199
200    def testPrivateStr(self):
201        """Verify the private string is sent to server.
202        """
203
204        fh = self._runClient("dir .", privatestr="testPrivateStr")
205        output = fh.read()
206
207        lines = self._readSyslog()
208        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
209
210    def testRetr(self):
211        """Try retrieving a file.
212        """
213
214        tmp = tempfile.mktemp('test_bbftpd')
215        fh = self._runClient("get %s/foo %s" % (DATADIR, tmp))
216
217        # Check the client output
218        output = fh.read()
219        self.assertLines(['get.*nogzip'], output)
220
221        # Check retrieved file
222        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
223        os.remove(tmp)
224
225        # Check syslog
226        lines = self._readSyslog()
227        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
228
229    def testStore(self):
230        """Try storing a file.
231        """
232
233
234        src = '%s/bar' % (DATADIR)
235        dest = '%s/new_bar' % (DATADIR)
236        os.system('cp %s %s' % (src, dest))
237        fh = self._runClient("put %s %s" % (src, dest))
238
239        # Check the client output
240        output = fh.read()
241        self.assertLines(['put .* nogzip'], output)
242
243
244        # Check sent file
245        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
246        os.remove(dest)
247
248        # Check syslog
249        lines = self._readSyslog()
250        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
251
252
253
254class AuthFailure(BaseFixture):
255    def setUp(self):
256        # We want to mark the beginning of this test case in syslog
257        syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0)
258        syslog.syslog(syslog.LOG_DEBUG, 'Starting EmbeddedServerTestCase')
259
260        self._startServer(authContext=FailingAuthContext())
261
262    def test(self):
263        """Fail authorisation
264        """
265
266        fh = self._runClient("dir .", debug=True)
267        output = fh.read()
268
269        lines = self._readSyslog()
270
271        self.assertLines(['.*Error while private authentication.*'], output)
272        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
273
274if __name__ == '__main__':
275    unittest.main()
Note: See TracBrowser for help on using the repository browser.