source: TI05-delivery/trunk/test/test_embedded.py @ 1157

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1157
Revision 1157, 10.1 KB checked in by spascoe, 14 years ago (diff)

Client-side python embedding is being implemented but doesn't pass the
test cases yet.

I've uncovered a bug (see #343) which needs fixing before the embedded client
can be fully debugged.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, syslog, stat
11from glob import glob
12import re, tempfile, getopt
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BUILDDIR = glob('%s/build/lib.*' % HOME)[0]
16BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
17DATADIR = '%s/test/data' % HOME
18VERSION = open('%s/VERSION' % HOME).read()
19NDG_MESSAGE_LEN = 256
20
21NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
22
23sys.path.append(BUILDDIR)
24
25import delivery.server as server
26
27
28class TestAuthHandler(server.BasicClientAuthHandler):
29   
30    def authorise(self):
31        msg = self.recvCStr()
32
33        syslog.syslog(syslog.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
34
35        self.send(NDG_HANDSHAKE)
36
37        privatestr = self.recvCStr()
38        syslog.syslog(syslog.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
39
40        return self.makeAuthzHandler(msg, "TestCaseUser")
41
42    def makeAuthzHandler(self, msg, user):
43        return TestAuthzHandler(msg, "TestCaseUser")
44
45class TestAuthzHandler(server.LiberalAuthzHandler):
46    def __init__(self, version, username):
47        super(TestAuthzHandler, self).__init__(username)
48        self.version = version
49
50
51
52
53class TestFailAuthHandler(server.AuthHandler):
54    def authorise(self):
55        raise server.AuthenticationFailure, "TestFailAuthHandler"
56
57
58class TestPermAuthHandler(TestAuthHandler):
59    def makeAuthzHandler(self, msg, user):
60        return TestPermAuthzHandler(msg, "TestCaseUser")
61
62class TestPermAuthzHandler(TestAuthzHandler):
63    """Check's the path's other read permission and authorises accordingly.
64    """
65
66    def authzControl(self, msgcode, transferopt, path):
67        # Ignore what is being authorised for now, just authorise if read allowed
68        return self.authzPath(path)
69
70    def authzRetr(self, path):
71        return self.authzPath(path)
72
73    def authzSend(self, path):
74        return self.authzPath(path)
75
76    def authzPath(self, path):
77        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
78        if (mode & 0004):
79            syslog.syslog(syslog.LOG_DEBUG, 'TestPermAuthzHandler OK')
80            return True
81        else:
82            syslog.syslog(syslog.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
83            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
84
85
86class BaseFixture(unittest.TestCase):
87
88    authContext = TestAuthHandler()
89
90    def setUp(self):
91        # We want to mark the beginning of this test case in syslog
92        syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0)
93        syslog.syslog(syslog.LOG_DEBUG, 'Starting TestCase')
94
95        self._startServer()
96
97    def tearDown(self):
98        self._stopServer()
99        syslog.syslog(syslog.LOG_DEBUG, 'Ended TestCase')
100        syslog.closelog()
101
102    #----------------------------------------------------------------------------------
103
104
105    def _startServer(self):
106        # Start the server and store it's PID
107        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
108
109    def _stopServer(self):
110        # Stop the server process
111        os.kill(self.pid, signal.SIGTERM)
112        os.waitpid(self.pid, 0)
113        syslog.syslog(syslog.LOG_DEBUG, 'Stopping server')
114
115
116    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
117        """Run the client.
118        """
119
120        if debug:
121            f = "-d"
122        else:
123            f = "-m"
124
125        if privatestr == None:
126            p = ""
127        else:
128            p = "-P %s" % repr(privatestr)
129
130        # This is ugly but I need to redirect stderr to stdout
131        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
132        return fh
133
134    def _readSyslog(self, logfile="/var/log/bbftpd/bbftpd.log"):
135        """Get all bbftpd messages from syslog for this test case.
136
137        @note: This requires read access to the logfile
138        """
139
140        # Read the logfile into a buffer
141        log = open(logfile).readlines()
142
143        # Find the latest testcase marker
144        start_i = 0
145        for i in xrange(len(log)-1, -1, -1):
146            if re.search('test_embedded.py: Starting TestCase', log[i]):
147                start_i = i+1
148                break
149        if not start_i:
150            raise RuntimeError, "Can't find test case entry in syslog"
151
152        filtered_log = []
153        for line in log[start_i:]:
154            if re.search(r'test_bbftpd.py:|bbftpd .*:', line):
155                filtered_log.append(line)
156
157        return ''.join(filtered_log)
158
159
160    def _findLines(self, lines, string):
161        """Look for lines matching each regular expression in lines.
162       
163        @return: True if all lines are found, else False
164        """
165       
166        for line in lines:
167            if not re.search('^%s$' % line, string, re.M):
168                return False
169           
170        return True
171
172
173    def assertLines(self, lines, string):
174        """Assert that there is a line in string matching each regular expression in lines.
175        """
176
177        for line in lines:
178            self.assert_(re.search('^%s$' % line, string, re.M))
179
180
181
182class AuthOK(BaseFixture):
183    """Test the bbftpd module.
184    """
185   
186    def testStartup(self):
187        lines = self._readSyslog()
188        # Give syslog time to flush it's logs.
189        time.sleep(1)
190        self.assertLines(['.*Starting bbftpd'], lines)
191
192    def testDir(self):
193        """Try connecting the client and listing a directory.
194        """
195
196
197        fh = self._runClient("dir %s" % DATADIR)
198        output = fh.read()
199
200
201        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
202                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
203
204        lines = self._readSyslog()
205        self.assertLines(['.*Getting new bbftp connexion.*',
206                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
207                          r'.*User TestCaseUser disconnected.*'], lines)
208
209
210    def testHandshake(self):
211        """Verify handshake messages are exchanged.
212        """
213
214        fh = self._runClient("dir .", debug=True)
215        output = fh.read()
216
217        print output
218
219        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
220
221        lines = self._readSyslog()
222        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
223
224    def testPrivateStr(self):
225        """Verify the private string is sent to server.
226        """
227
228        fh = self._runClient("dir .", privatestr="testPrivateStr")
229        output = fh.read()
230
231        lines = self._readSyslog()
232        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
233
234    def testRetr(self):
235        """Try retrieving a file.
236        """
237
238        tmp = tempfile.mktemp('test_bbftpd')
239        fh = self._runClient("get %s/foo %s" % (DATADIR, tmp))
240
241        # Check the client output
242        output = fh.read()
243
244        self.assertLines(['get.*nogzip'], output)
245
246        # Check retrieved file
247        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
248        os.remove(tmp)
249
250        # Check syslog
251        lines = self._readSyslog()
252        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
253
254    def testStore(self):
255        """Try storing a file.
256        """
257
258
259        src = '%s/bar' % (DATADIR)
260        dest = '%s/new_bar' % (DATADIR)
261        os.system('cp %s %s' % (src, dest))
262        fh = self._runClient("put %s %s" % (src, dest))
263
264        # Check the client output
265        output = fh.read()
266        self.assertLines(['put .* nogzip'], output)
267
268
269        # Check sent file
270        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
271        os.remove(dest)
272
273        # Check syslog
274        lines = self._readSyslog()
275        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
276
277
278
279class AuthFail(BaseFixture):
280    authContext = TestFailAuthHandler()
281
282    def test(self):
283        """Fail authorisation
284        """
285
286        fh = self._runClient("dir .", debug=True)
287        output = fh.read()
288
289        lines = self._readSyslog()
290
291        self.assertLines(['.*Error while private authentication.*'], output)
292        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
293
294
295class Authz(BaseFixture):
296    authContext = TestPermAuthHandler()
297
298    def setUp(self):
299        super(Authz, self).setUp()
300
301        # Set modes on test files
302        os.chmod('%s/bar' % (DATADIR), 0640)
303        os.chmod('%s/foo' % (DATADIR), 0655)
304
305    def testOK(self):
306        """Test dir of a readable file.
307        """
308
309        fh = self._runClient("stat %s/foo" % (DATADIR), debug=True)
310        output = fh.read()
311
312        lines = self._readSyslog()
313
314
315        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
316        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK'], lines)
317
318    def testFail(self):
319        """Test dir of a non-readable file.
320        """
321
322        fh = self._runClient("stat %s/bar" % (DATADIR), debug=True)
323        output = fh.read()
324
325        lines = self._readSyslog()
326
327        self.assertLines(['BBFTP-ERROR.* ndg_authz_control: AuthorisationFailure.* no read perms',
328                          'stat /.*/bar FAILED'], output)
329        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.* TestPermAuthzHandler FAIL',
330                          '.* ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
331
332
333
334import delivery.bbftpc
335
336class PythonClientAuthOK(AuthOK):
337    """Repeat AuthOK tests with a python-embedded client.
338    """
339
340    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
341        """Run the client.
342        """
343
344        if debug:
345            f = "-d"
346        else:
347            f = "-m"
348
349        # Tell bbftp to dump output to a file which is then opened for reading.
350        tmp = tempfile.mktemp('test_bbftpd')
351        args = ['-e', cmd, f, '-u', user, '-o', tmp, '-f', tmp, '-r', '1']
352
353        if privatestr != None:
354            args += ['-P', privatestr]
355
356        delivery.bbftpc.run(args + ['localhost'])
357
358        fh.open(tmp)
359        os.remove(tmp)
360
361        return fh
362
363#if __name__ == '__main__':
364#    unittest.main()
Note: See TracBrowser for help on using the repository browser.