source: TI05-delivery/trunk/test/test_embedded.py @ 1452

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1452
Revision 1452, 10.2 KB checked in by spascoe, 13 years ago (diff)

Minor changes to the test cases.

Only *.testStartup now fail. For some reason the first 2 log messages are
getting swallowed.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, stat
11from glob import glob
12import re, tempfile, getopt, logging
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
16DATADIR = '%s/test/data' % HOME
17VERSION = open('%s/VERSION' % HOME).read()
18NDG_MESSAGE_LEN = 256
19
20NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
21
22
23import ndg.delivery.server.pybbftp as server
24import traceback
25
26def flushLogging():
27    logging.shutdown()
28def resetLogging():
29    logging.root.handlers = []
30    logging.basicConfig(level=logging.DEBUG, filename='./bbftpd.log')
31
32resetLogging()
33logging.debug('Reset the loggers')
34flushLogging()
35resetLogging()
36server.bbftpd.after_fork_hook = resetLogging
37server.bbftpd.before_fork_hook = flushLogging
38
39
40
41class TestAuthHandler(server.AuthHandler):
42   
43    def authenticate(self):
44
45        msg = self.recv()
46        self.log(server.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
47               
48        self.send(NDG_HANDSHAKE)
49
50        privatestr = self.recv()
51        self.log(server.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
52       
53        return self.makeAuthzHandler(msg, "TestCaseUser")
54
55    def makeAuthzHandler(self, msg, user):
56        return TestAuthzHandler(msg, "TestCaseUser")
57
58
59class TestAuthzHandler(server.LiberalAuthzHandler):
60    def __init__(self, version, username):
61        super(TestAuthzHandler, self).__init__(username)
62        self.version = version
63
64
65
66
67class TestFailAuthHandler(server.AuthHandler):
68    def authorise(self):
69        raise server.AuthenticationFailure, "TestFailAuthHandler"
70
71
72class TestPermAuthHandler(TestAuthHandler):
73    def makeAuthzHandler(self, msg, user):
74        return TestPermAuthzHandler(msg, "TestCaseUser")
75
76class TestPermAuthzHandler(TestAuthzHandler):
77    """Check's the path's other read permission and authorises accordingly.
78    """
79
80    def authzControl(self, msgcode, transferopt, path):
81        self.log(server.LOG_DEBUG, 'TestPermAuthzHandler.authzControl: msgcode = %s' % hex(msgcode))
82        return self.authzPath(path)
83
84    def authzRetr(self, path):
85        return self.authzPath(path)
86
87    def authzStore(self, path):
88        return self.authzPath(path)
89
90    def authzPath(self, path):
91        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
92        if (mode & 0004):
93            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler OK')
94            return True
95        else:
96            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
97            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
98
99
100class BaseFixture(unittest.TestCase):
101
102    authContext = TestAuthHandler()
103
104    def setUp(self):
105        self._clearLog()
106        self._startServer()
107
108    def tearDown(self):
109        self._stopServer()
110
111    #----------------------------------------------------------------------------------
112
113
114    def _startServer(self):
115        # Start the server and store it's PID
116        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
117
118    def _stopServer(self):
119        # Stop the server process
120        os.kill(self.pid, signal.SIGTERM)
121        os.waitpid(self.pid, 0)
122       
123
124
125    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
126        """Run the client.
127        """
128
129        if debug:
130            f = "-d"
131        else:
132            f = "-m"
133
134        if privatestr == None:
135            p = ""
136        else:
137            p = "-P %s" % repr(privatestr)
138
139        # This is ugly but I need to redirect stderr to stdout
140        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
141
142        output = fh.read()
143        print >>open('./bbftpc.log', 'w'), output
144
145        return output
146
147
148    def _clearLog(self, logfile="./bbftpd.log"):
149        if os.path.exists(logfile):
150            os.remove(logfile)
151
152    def _readLog(self, logfile="./bbftpd.log"):
153        return open(logfile).read()
154   
155
156
157    def _findLines(self, lines, string):
158        """Look for lines matching each regular expression in lines.
159       
160        @return: True if all lines are found, else False
161        """
162       
163        for line in lines:
164            if not re.search('^%s$' % line, string, re.M):
165                return False
166           
167        return True
168
169
170    def assertLines(self, lines, string):
171        """Assert that there is a line in string matching each regular expression in lines.
172        """
173
174        for line in lines:
175            self.assert_(re.search('^%s$' % line, string, re.M))
176
177
178
179class AuthOK(BaseFixture):
180    """Test the bbftpd module.
181    """
182   
183    def testStartup(self):
184        lines = self._readLog()
185        self.assertLines(['.*Starting bbftpd'], lines)
186
187    def testDir(self):
188        """Try connecting the client and listing a directory.
189        """
190
191        output = self._runClient("dir %s" % DATADIR)
192
193        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
194                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
195
196        lines = self._readLog()
197        self.assertLines(['.*Getting new bbftp connexion.*',
198                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
199                          r'.*User TestCaseUser disconnected.*'], lines)
200
201
202    def testHandshake(self):
203        """Verify handshake messages are exchanged.
204        """
205
206        output = self._runClient("dir .", debug=True)
207
208        print >>open('./bbftpc.log', 'w'), output
209
210        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
211
212        lines = self._readLog()
213
214        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
215
216    def testPrivateStr(self):
217        """Verify the private string is sent to server.
218        """
219
220        output = self._runClient("dir .", privatestr="testPrivateStr")
221
222        lines = self._readLog()
223        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
224
225    def testRetr(self):
226        """Try retrieving a file.
227        """
228
229        tmp = tempfile.mktemp('test_bbftpd')
230        output = self._runClient("get %s/foo %s" % (DATADIR, tmp))
231
232        # Check the client output
233        self.assertLines(['get.*nogzip'], output)
234
235        # Check retrieved file
236        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
237        os.remove(tmp)
238
239        # Check log
240        lines = self._readLog()
241        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
242
243    def testStore(self):
244        """Try storing a file.
245        """
246
247
248        src = '%s/bar' % (DATADIR)
249        dest = '%s/new_bar' % (DATADIR)
250        os.system('cp %s %s' % (src, dest))
251        output = self._runClient("put %s %s" % (src, dest))
252
253        # Check the client output
254        self.assertLines(['put .* nogzip'], output)
255
256
257        # Check sent file
258        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
259        os.remove(dest)
260
261        # Check log
262        lines = self._readLog()
263        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
264
265
266
267class AuthFail(BaseFixture):
268    authContext = TestFailAuthHandler()
269
270    def test(self):
271        """Fail authorisation
272        """
273
274        output = self._runClient("dir .", debug=True)
275
276        lines = self._readLog()
277
278        self.assertLines(['.*Error while private authentication.*'], output)
279        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
280
281
282class Authz(BaseFixture):
283    authContext = TestPermAuthHandler()
284
285    def setUp(self):
286        super(Authz, self).setUp()
287
288        # Set modes on test files
289        os.chmod('%s/bar' % (DATADIR), 0640)
290        os.chmod('%s/foo' % (DATADIR), 0655)
291
292    def testOK(self):
293        """Test dir of a readable file.
294        """
295
296        output = self._runClient("stat %s/foo" % (DATADIR), debug=True)
297
298        lines = self._readLog()
299
300
301        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
302        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK',
303                          '.*TestPermAuthzHandler.*msgcode = %s.*' % hex(server.MSG.STAT)], lines)
304
305    def testFail(self):
306        """Test dir of a non-readable file.
307        """
308
309        output = self._runClient("stat %s/bar" % (DATADIR), debug=True)
310
311        lines = self._readLog()
312
313        self.assertLines(['.*ndg_authz_control: AuthorisationFailure.* no read perms',
314                          'stat /.*/bar FAILED'], output)
315        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.*TestPermAuthzHandler FAIL',
316                          '.*ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
317
318
319class PythonClientAuthOK(AuthOK):
320    """Repeat AuthOK tests with a python-embedded client.
321    """
322
323    def testMultiCommand(self):
324        """Execute more than one command per connection.
325        """
326        output = self._runClient(["dir %s" % DATADIR, "stat %s/bar" % DATADIR])
327
328        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
329                          r' f .*/foo', r' f .*/bar', r' f .*/baz',
330                          r'stat .*'], output)
331
332        lines = self._readLog()
333        self.assertLines(['.*Getting new bbftp connexion.*',
334                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
335                          r'.*User TestCaseUser disconnected.*',
336                          r'.*:Receiving MSG_STAT.*'], lines)
337
338
339
340
341    def _runClient(self, cmds, debug=False, user="testcase", privatestr=None):
342        """Run the client.
343        """
344
345        if type(cmds) == type(''):
346            cmd_args = ['-e', repr(cmds)]
347        else:
348            cmd_args = []
349            for cmd in cmds:
350                cmd_args += ['-e', repr(cmd)]
351
352        if debug:
353            f = "-d"
354        else:
355            f = "-m"
356
357        args = cmd_args + [f, '-u', user,
358                '-r', '1']
359
360        if privatestr != None:
361            args += ['-P', privatestr]
362
363        args.append('localhost')
364
365        fh = os.popen('%s/test/runclient.py %s' % (HOME, ' '.join(args)))
366        output = fh.read()
367        print >>open('./bbftpc.log', 'w'), output
368
369        return output
370
371if __name__ == '__main__':
372    unittest.main()
Note: See TracBrowser for help on using the repository browser.