source: TI05-delivery/trunk/test/test_embedded.py @ 1289

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1289
Revision 1289, 11.1 KB checked in by spascoe, 13 years ago (diff)

Updated python code to reflect the renaming of the delivery package to pybbftp.
Some C tidying to reduce compiler warnings.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, syslog, stat
11from glob import glob
12import re, tempfile, getopt
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BUILDDIR = glob('%s/build/lib.*' % HOME)[0]
16BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
17DATADIR = '%s/test/data' % HOME
18VERSION = open('%s/VERSION' % HOME).read()
19NDG_MESSAGE_LEN = 256
20
21NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
22
23sys.path.append(BUILDDIR)
24
25import pybbftp.server as server
26import traceback
27
28
29class TestAuthHandler(server.AuthHandler):
30   
31    def authenticate(self):
32        # Dump the pid so that gdm can pick it up
33        fh = open('./DS_conn.pid', 'w')
34        print >>fh, os.getpid()
35        fh.close()
36        #time.sleep(15)
37
38        msg = self.recv()
39        syslog.syslog(syslog.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
40               
41        self.send(NDG_HANDSHAKE)
42
43        privatestr = self.recv()
44        syslog.syslog(syslog.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
45       
46        return self.makeAuthzHandler(msg, "TestCaseUser")
47
48    def makeAuthzHandler(self, msg, user):
49        return TestAuthzHandler(msg, "TestCaseUser")
50
51class TestAuthzHandler(server.LiberalAuthzHandler):
52    def __init__(self, version, username):
53        super(TestAuthzHandler, self).__init__(username)
54        self.version = version
55
56
57
58
59class TestFailAuthHandler(server.AuthHandler):
60    def authorise(self):
61        raise server.AuthenticationFailure, "TestFailAuthHandler"
62
63
64class TestPermAuthHandler(TestAuthHandler):
65    def makeAuthzHandler(self, msg, user):
66        return TestPermAuthzHandler(msg, "TestCaseUser")
67
68class TestPermAuthzHandler(TestAuthzHandler):
69    """Check's the path's other read permission and authorises accordingly.
70    """
71
72    def authzControl(self, msgcode, transferopt, path):
73        syslog.syslog(syslog.LOG_DEBUG, 'TestPermAuthzHandler.authzControl: msgcode = %s' % hex(msgcode))
74        return self.authzPath(path)
75
76    def authzRetr(self, path):
77        return self.authzPath(path)
78
79    def authzStore(self, path):
80        return self.authzPath(path)
81
82    def authzPath(self, path):
83        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
84        if (mode & 0004):
85            syslog.syslog(syslog.LOG_DEBUG, 'TestPermAuthzHandler OK')
86            return True
87        else:
88            syslog.syslog(syslog.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
89            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
90
91
92class BaseFixture(unittest.TestCase):
93
94    authContext = TestAuthHandler()
95
96    def setUp(self):
97        # We want to mark the beginning of this test case in syslog
98        syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0)
99        syslog.syslog(syslog.LOG_DEBUG, 'Starting TestCase')
100
101        self._startServer()
102
103    def tearDown(self):
104        self._stopServer()
105        syslog.syslog(syslog.LOG_DEBUG, 'Ended TestCase')
106        syslog.closelog()
107
108    #----------------------------------------------------------------------------------
109
110
111    def _startServer(self):
112        # Start the server and store it's PID
113        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
114
115    def _stopServer(self):
116        # Stop the server process
117        os.kill(self.pid, signal.SIGTERM)
118        os.waitpid(self.pid, 0)
119        syslog.syslog(syslog.LOG_DEBUG, 'Stopping server')
120
121
122    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
123        """Run the client.
124        """
125
126        if debug:
127            f = "-d"
128        else:
129            f = "-m"
130
131        if privatestr == None:
132            p = ""
133        else:
134            p = "-P %s" % repr(privatestr)
135
136        # This is ugly but I need to redirect stderr to stdout
137        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
138        return fh
139
140    def _readSyslog(self, logfile="/var/log/bbftpd/bbftpd.log"):
141        """Get all bbftpd messages from syslog for this test case.
142
143        @note: This requires read access to the logfile
144        """
145
146        # Read the logfile into a buffer
147        log = open(logfile).readlines()
148
149        # Find the latest testcase marker
150        start_i = 0
151        for i in xrange(len(log)-1, -1, -1):
152            if re.search('test_embedded.py: Starting TestCase', log[i]):
153                start_i = i+1
154                break
155        if not start_i:
156            raise RuntimeError, "Can't find test case entry in syslog"
157
158        filtered_log = []
159        for line in log[start_i:]:
160            if re.search(r'test_bbftpd.py:|bbftpd .*:', line):
161                filtered_log.append(line)
162
163        return ''.join(filtered_log)
164
165
166    def _findLines(self, lines, string):
167        """Look for lines matching each regular expression in lines.
168       
169        @return: True if all lines are found, else False
170        """
171       
172        for line in lines:
173            if not re.search('^%s$' % line, string, re.M):
174                return False
175           
176        return True
177
178
179    def assertLines(self, lines, string):
180        """Assert that there is a line in string matching each regular expression in lines.
181        """
182
183        for line in lines:
184            self.assert_(re.search('^%s$' % line, string, re.M))
185
186
187
188class AuthOK(BaseFixture):
189    """Test the bbftpd module.
190    """
191   
192    def testStartup(self):
193        lines = self._readSyslog()
194        # Give syslog time to flush it's logs.
195        time.sleep(1)
196        self.assertLines(['.*Starting bbftpd'], lines)
197
198    def testDir(self):
199        """Try connecting the client and listing a directory.
200        """
201
202
203        fh = self._runClient("dir %s" % DATADIR)
204        output = fh.read()
205
206
207        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
208                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
209
210        lines = self._readSyslog()
211        self.assertLines(['.*Getting new bbftp connexion.*',
212                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
213                          r'.*User TestCaseUser disconnected.*'], lines)
214
215
216    def testHandshake(self):
217        """Verify handshake messages are exchanged.
218        """
219
220        fh = self._runClient("dir .", debug=True)
221        output = fh.read()
222
223        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
224
225        lines = self._readSyslog()
226
227        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
228
229    def testPrivateStr(self):
230        """Verify the private string is sent to server.
231        """
232
233        fh = self._runClient("dir .", privatestr="testPrivateStr")
234        output = fh.read()
235
236        lines = self._readSyslog()
237        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
238
239    def testRetr(self):
240        """Try retrieving a file.
241        """
242
243        tmp = tempfile.mktemp('test_bbftpd')
244        fh = self._runClient("get %s/foo %s" % (DATADIR, tmp))
245
246        # Check the client output
247        output = fh.read()
248
249        self.assertLines(['get.*nogzip'], output)
250
251        # Check retrieved file
252        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
253        os.remove(tmp)
254
255        # Check syslog
256        lines = self._readSyslog()
257        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
258
259    def testStore(self):
260        """Try storing a file.
261        """
262
263
264        src = '%s/bar' % (DATADIR)
265        dest = '%s/new_bar' % (DATADIR)
266        os.system('cp %s %s' % (src, dest))
267        fh = self._runClient("put %s %s" % (src, dest))
268
269        # Check the client output
270        output = fh.read()
271        self.assertLines(['put .* nogzip'], output)
272
273
274        # Check sent file
275        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
276        os.remove(dest)
277
278        # Check syslog
279        lines = self._readSyslog()
280        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
281
282
283
284class AuthFail(BaseFixture):
285    authContext = TestFailAuthHandler()
286
287    def test(self):
288        """Fail authorisation
289        """
290
291        fh = self._runClient("dir .", debug=True)
292        output = fh.read()
293
294        lines = self._readSyslog()
295
296        self.assertLines(['.*Error while private authentication.*'], output)
297        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
298
299
300class Authz(BaseFixture):
301    authContext = TestPermAuthHandler()
302
303    def setUp(self):
304        super(Authz, self).setUp()
305
306        # Set modes on test files
307        os.chmod('%s/bar' % (DATADIR), 0640)
308        os.chmod('%s/foo' % (DATADIR), 0655)
309
310    def testOK(self):
311        """Test dir of a readable file.
312        """
313
314        fh = self._runClient("stat %s/foo" % (DATADIR), debug=True)
315        output = fh.read()
316
317        lines = self._readSyslog()
318
319
320        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
321        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK',
322                          '.*TestPermAuthzHandler.*msgcode = %s.*' % hex(server.MSG.STAT)], lines)
323
324    def testFail(self):
325        """Test dir of a non-readable file.
326        """
327
328        fh = self._runClient("stat %s/bar" % (DATADIR), debug=True)
329        output = fh.read()
330
331        lines = self._readSyslog()
332
333        self.assertLines(['BBFTP-ERROR.* ndg_authz_control: AuthorisationFailure.* no read perms',
334                          'stat /.*/bar FAILED'], output)
335        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.* TestPermAuthzHandler FAIL',
336                          '.* ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
337
338
339class PythonClientAuthOK(AuthOK):
340    """Repeat AuthOK tests with a python-embedded client.
341    """
342
343    def testMultiCommand(self):
344        """Execute more than one command per connection.
345        """
346        fh = self._runClient(["dir %s" % DATADIR, "stat %s/bar" % DATADIR])
347        output = fh.read()
348
349        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
350                          r' f .*/foo', r' f .*/bar', r' f .*/baz',
351                          r'stat .*'], output)
352
353        lines = self._readSyslog()
354        self.assertLines(['.*Getting new bbftp connexion.*',
355                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
356                          r'.*User TestCaseUser disconnected.*',
357                          r'.*: Receiving MSG_STAT.*'], lines)
358
359
360
361
362    def _runClient(self, cmds, debug=False, user="testcase", privatestr=None):
363        """Run the client.
364        """
365
366        if type(cmds) == type(''):
367            cmd_args = ['-e', repr(cmds)]
368        else:
369            cmd_args = []
370            for cmd in cmds:
371                cmd_args += ['-e', repr(cmd)]
372
373        if debug:
374            f = "-d"
375        else:
376            f = "-m"
377
378        args = cmd_args + [f, '-u', user,
379                '-r', '1']
380
381        if privatestr != None:
382            args += ['-P', privatestr]
383
384        args.append('localhost')
385
386        fh = os.popen('%s/test/runclient.py %s' % (HOME, ' '.join(args)))
387
388        return fh
389
390if __name__ == '__main__':
391    unittest.main()
Note: See TracBrowser for help on using the repository browser.