source: TI05-delivery/trunk/test/test_embedded.py @ 1332

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1332
Revision 1332, 11.3 KB checked in by spascoe, 14 years ago (diff)

Added an API hook to use bbftpd_log to log messages from python code.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, syslog, stat
11from glob import glob
12import re, tempfile, getopt
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BUILDDIR = glob('%s/build/lib.*' % HOME)[0]
16BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
17DATADIR = '%s/test/data' % HOME
18VERSION = open('%s/VERSION' % HOME).read()
19NDG_MESSAGE_LEN = 256
20
21NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
22
23sys.path.append(BUILDDIR)
24
25import pybbftp.server as server
26import traceback
27
28
29class TestAuthHandler(server.AuthHandler):
30   
31    def authenticate(self):
32        # Dump the pid so that gdm can pick it up
33        fh = open('./DS_conn.pid', 'w')
34        print >>fh, os.getpid()
35        fh.close()
36        #time.sleep(15)
37
38        msg = self.recv()
39        self.logMessage('AuthContext received Auth message: %s' % msg)
40               
41        self.send(NDG_HANDSHAKE)
42
43        privatestr = self.recv()
44        self.logMessage("AuthContext received privatestr: %s" % privatestr)
45       
46        return self.makeAuthzHandler(msg, "TestCaseUser")
47
48    def makeAuthzHandler(self, msg, user):
49        return TestAuthzHandler(msg, "TestCaseUser")
50
51    def logMessage(self, msg):
52        server.bbftpd.log(syslog.LOG_DEBUG, msg)
53
54class TestAuthzHandler(server.LiberalAuthzHandler):
55    def __init__(self, version, username):
56        super(TestAuthzHandler, self).__init__(username)
57        self.version = version
58
59
60
61
62class TestFailAuthHandler(server.AuthHandler):
63    def authorise(self):
64        raise server.AuthenticationFailure, "TestFailAuthHandler"
65
66
67class TestPermAuthHandler(TestAuthHandler):
68    def makeAuthzHandler(self, msg, user):
69        return TestPermAuthzHandler(msg, "TestCaseUser")
70
71class TestPermAuthzHandler(TestAuthzHandler):
72    """Check's the path's other read permission and authorises accordingly.
73    """
74
75    def authzControl(self, msgcode, transferopt, path):
76        server.bbftpd.log(syslog.LOG_DEBUG, 'TestPermAuthzHandler.authzControl: msgcode = %s' % hex(msgcode))
77        return self.authzPath(path)
78
79    def authzRetr(self, path):
80        return self.authzPath(path)
81
82    def authzStore(self, path):
83        return self.authzPath(path)
84
85    def authzPath(self, path):
86        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
87        if (mode & 0004):
88            server.bbftpd.log(syslog.LOG_DEBUG, 'TestPermAuthzHandler OK')
89            return True
90        else:
91            server.bbftpd.log(syslog.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
92            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
93
94
95class BaseFixture(unittest.TestCase):
96
97    authContext = TestAuthHandler()
98
99    def setUp(self):
100        # We want to mark the beginning of this test case in syslog
101        #syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0)
102        #syslog.syslog(syslog.LOG_DEBUG, 'Starting TestCase')
103
104        self._clearLog()
105        self._startServer()
106
107    def tearDown(self):
108        self._stopServer()
109        #syslog.syslog(syslog.LOG_DEBUG, 'Ended TestCase')
110        #syslog.closelog()
111
112    #----------------------------------------------------------------------------------
113
114
115    def _startServer(self):
116        # Start the server and store it's PID
117        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
118
119    def _stopServer(self):
120        # Stop the server process
121        os.kill(self.pid, signal.SIGTERM)
122        os.waitpid(self.pid, 0)
123        #syslog.syslog(syslog.LOG_DEBUG, 'Stopping server')
124
125
126    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
127        """Run the client.
128        """
129
130        if debug:
131            f = "-d"
132        else:
133            f = "-m"
134
135        if privatestr == None:
136            p = ""
137        else:
138            p = "-P %s" % repr(privatestr)
139
140        # This is ugly but I need to redirect stderr to stdout
141        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
142        return fh
143
144    def _clearLog(self, logfile="./bbftpd.log"):
145        os.remove(logfile)
146
147    def _readLog(self, logfile="./bbftpd.log"):
148        return open(logfile).read()
149   
150
151    def _readSyslog(self, logfile="/var/log/bbftpd/bbftpd.log"):
152        """Get all bbftpd messages from syslog for this test case.
153
154        @note: This requires read access to the logfile
155        """
156
157        # Read the logfile into a buffer
158        log = open(logfile).readlines()
159
160        # Find the latest testcase marker
161        start_i = 0
162        for i in xrange(len(log)-1, -1, -1):
163            if re.search('test_embedded.py: Starting TestCase', log[i]):
164                start_i = i+1
165                break
166        if not start_i:
167            raise RuntimeError, "Can't find test case entry in syslog"
168
169        filtered_log = []
170        for line in log[start_i:]:
171            if re.search(r'test_bbftpd.py:|bbftpd .*:', line):
172                filtered_log.append(line)
173
174        return ''.join(filtered_log)
175
176
177    def _findLines(self, lines, string):
178        """Look for lines matching each regular expression in lines.
179       
180        @return: True if all lines are found, else False
181        """
182       
183        for line in lines:
184            if not re.search('^%s$' % line, string, re.M):
185                return False
186           
187        return True
188
189
190    def assertLines(self, lines, string):
191        """Assert that there is a line in string matching each regular expression in lines.
192        """
193
194        for line in lines:
195            self.assert_(re.search('^%s$' % line, string, re.M))
196
197
198
199class AuthOK(BaseFixture):
200    """Test the bbftpd module.
201    """
202   
203    def testStartup(self):
204        lines = self._readLog()
205        # Give syslog time to flush it's logs.
206        time.sleep(1)
207        self.assertLines(['.*Starting bbftpd'], lines)
208
209    def testDir(self):
210        """Try connecting the client and listing a directory.
211        """
212
213
214        fh = self._runClient("dir %s" % DATADIR)
215        output = fh.read()
216
217
218        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
219                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
220
221        lines = self._readLog()
222        self.assertLines(['.*Getting new bbftp connexion.*',
223                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
224                          r'.*User TestCaseUser disconnected.*'], lines)
225
226
227    def testHandshake(self):
228        """Verify handshake messages are exchanged.
229        """
230
231        fh = self._runClient("dir .", debug=True)
232        output = fh.read()
233
234        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
235
236        lines = self._readLog()
237
238        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
239
240    def testPrivateStr(self):
241        """Verify the private string is sent to server.
242        """
243
244        fh = self._runClient("dir .", privatestr="testPrivateStr")
245        output = fh.read()
246
247        lines = self._readLog()
248        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
249
250    def testRetr(self):
251        """Try retrieving a file.
252        """
253
254        tmp = tempfile.mktemp('test_bbftpd')
255        fh = self._runClient("get %s/foo %s" % (DATADIR, tmp))
256
257        # Check the client output
258        output = fh.read()
259
260        self.assertLines(['get.*nogzip'], output)
261
262        # Check retrieved file
263        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
264        os.remove(tmp)
265
266        # Check syslog
267        lines = self._readLog()
268        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
269
270    def testStore(self):
271        """Try storing a file.
272        """
273
274
275        src = '%s/bar' % (DATADIR)
276        dest = '%s/new_bar' % (DATADIR)
277        os.system('cp %s %s' % (src, dest))
278        fh = self._runClient("put %s %s" % (src, dest))
279
280        # Check the client output
281        output = fh.read()
282        self.assertLines(['put .* nogzip'], output)
283
284
285        # Check sent file
286        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
287        os.remove(dest)
288
289        # Check syslog
290        lines = self._readLog()
291        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
292
293
294
295class AuthFail(BaseFixture):
296    authContext = TestFailAuthHandler()
297
298    def test(self):
299        """Fail authorisation
300        """
301
302        fh = self._runClient("dir .", debug=True)
303        output = fh.read()
304
305        lines = self._readLog()
306
307        self.assertLines(['.*Error while private authentication.*'], output)
308        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
309
310
311class Authz(BaseFixture):
312    authContext = TestPermAuthHandler()
313
314    def setUp(self):
315        super(Authz, self).setUp()
316
317        # Set modes on test files
318        os.chmod('%s/bar' % (DATADIR), 0640)
319        os.chmod('%s/foo' % (DATADIR), 0655)
320
321    def testOK(self):
322        """Test dir of a readable file.
323        """
324
325        fh = self._runClient("stat %s/foo" % (DATADIR), debug=True)
326        output = fh.read()
327
328        lines = self._readLog()
329
330
331        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
332        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK',
333                          '.*TestPermAuthzHandler.*msgcode = %s.*' % hex(server.MSG.STAT)], lines)
334
335    def testFail(self):
336        """Test dir of a non-readable file.
337        """
338
339        fh = self._runClient("stat %s/bar" % (DATADIR), debug=True)
340        output = fh.read()
341
342        lines = self._readLog()
343
344        self.assertLines(['.* ndg_authz_control: AuthorisationFailure.* no read perms',
345                          'stat /.*/bar FAILED'], output)
346        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.* TestPermAuthzHandler FAIL',
347                          '.* ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
348
349
350class PythonClientAuthOK(AuthOK):
351    """Repeat AuthOK tests with a python-embedded client.
352    """
353
354    def testMultiCommand(self):
355        """Execute more than one command per connection.
356        """
357        fh = self._runClient(["dir %s" % DATADIR, "stat %s/bar" % DATADIR])
358        output = fh.read()
359
360        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
361                          r' f .*/foo', r' f .*/bar', r' f .*/baz',
362                          r'stat .*'], output)
363
364        lines = self._readLog()
365        self.assertLines(['.*Getting new bbftp connexion.*',
366                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
367                          r'.*User TestCaseUser disconnected.*',
368                          r'.*: Receiving MSG_STAT.*'], lines)
369
370
371
372
373    def _runClient(self, cmds, debug=False, user="testcase", privatestr=None):
374        """Run the client.
375        """
376
377        if type(cmds) == type(''):
378            cmd_args = ['-e', repr(cmds)]
379        else:
380            cmd_args = []
381            for cmd in cmds:
382                cmd_args += ['-e', repr(cmd)]
383
384        if debug:
385            f = "-d"
386        else:
387            f = "-m"
388
389        args = cmd_args + [f, '-u', user,
390                '-r', '1']
391
392        if privatestr != None:
393            args += ['-P', privatestr]
394
395        args.append('localhost')
396
397        fh = os.popen('%s/test/runclient.py %s' % (HOME, ' '.join(args)))
398
399        return fh
400
401if __name__ == '__main__':
402    unittest.main()
Note: See TracBrowser for help on using the repository browser.