source: TI05-delivery/trunk/test/test_embedded.py @ 1441

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1441
Revision 1441, 9.9 KB checked in by spascoe, 15 years ago (diff)

Embedded test cases and examples work when executed from a
virtual-python with delivery service installed.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, stat
11from glob import glob
12import re, tempfile, getopt
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BUILDDIR = glob('%s/build/lib.*' % HOME)[0]
16BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
17DATADIR = '%s/test/data' % HOME
18VERSION = open('%s/VERSION' % HOME).read()
19NDG_MESSAGE_LEN = 256
20
21NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
22
23sys.path.append(BUILDDIR)
24
25import ndg.delivery.server.pybbftp as server
26import traceback
27
28
29class TestAuthHandler(server.AuthHandler):
30   
31    def authenticate(self):
32
33        msg = self.recv()
34        self.log(server.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
35               
36        self.send(NDG_HANDSHAKE)
37
38        privatestr = self.recv()
39        self.log(server.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
40       
41        return self.makeAuthzHandler(msg, "TestCaseUser")
42
43    def makeAuthzHandler(self, msg, user):
44        return TestAuthzHandler(msg, "TestCaseUser")
45
46
47class TestAuthzHandler(server.LiberalAuthzHandler):
48    def __init__(self, version, username):
49        super(TestAuthzHandler, self).__init__(username)
50        self.version = version
51
52
53
54
55class TestFailAuthHandler(server.AuthHandler):
56    def authorise(self):
57        raise server.AuthenticationFailure, "TestFailAuthHandler"
58
59
60class TestPermAuthHandler(TestAuthHandler):
61    def makeAuthzHandler(self, msg, user):
62        return TestPermAuthzHandler(msg, "TestCaseUser")
63
64class TestPermAuthzHandler(TestAuthzHandler):
65    """Check's the path's other read permission and authorises accordingly.
66    """
67
68    def authzControl(self, msgcode, transferopt, path):
69        self.log(server.LOG_DEBUG, 'TestPermAuthzHandler.authzControl: msgcode = %s' % hex(msgcode))
70        return self.authzPath(path)
71
72    def authzRetr(self, path):
73        return self.authzPath(path)
74
75    def authzStore(self, path):
76        return self.authzPath(path)
77
78    def authzPath(self, path):
79        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
80        if (mode & 0004):
81            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler OK')
82            return True
83        else:
84            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
85            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
86
87
88class BaseFixture(unittest.TestCase):
89
90    authContext = TestAuthHandler()
91
92    def setUp(self):
93        self._clearLog()
94        self._startServer()
95
96    def tearDown(self):
97        self._stopServer()
98
99    #----------------------------------------------------------------------------------
100
101
102    def _startServer(self):
103        # Start the server and store it's PID
104        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
105
106    def _stopServer(self):
107        # Stop the server process
108        os.kill(self.pid, signal.SIGTERM)
109        os.waitpid(self.pid, 0)
110
111
112    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
113        """Run the client.
114        """
115
116        if debug:
117            f = "-d"
118        else:
119            f = "-m"
120
121        if privatestr == None:
122            p = ""
123        else:
124            p = "-P %s" % repr(privatestr)
125
126        # This is ugly but I need to redirect stderr to stdout
127        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
128
129        output = fh.read()
130        print >>open('./bbftpc.log', 'w'), output
131
132        return output
133
134
135    def _clearLog(self, logfile="./bbftpd.log"):
136        if os.path.exists(logfile):
137            os.remove(logfile)
138
139    def _readLog(self, logfile="./bbftpd.log"):
140        return open(logfile).read()
141   
142
143
144    def _findLines(self, lines, string):
145        """Look for lines matching each regular expression in lines.
146       
147        @return: True if all lines are found, else False
148        """
149       
150        for line in lines:
151            if not re.search('^%s$' % line, string, re.M):
152                return False
153           
154        return True
155
156
157    def assertLines(self, lines, string):
158        """Assert that there is a line in string matching each regular expression in lines.
159        """
160
161        for line in lines:
162            self.assert_(re.search('^%s$' % line, string, re.M))
163
164
165
166class AuthOK(BaseFixture):
167    """Test the bbftpd module.
168    """
169   
170    def testStartup(self):
171        lines = self._readLog()
172        self.assertLines(['.*Starting bbftpd'], lines)
173
174    def testDir(self):
175        """Try connecting the client and listing a directory.
176        """
177
178        output = self._runClient("dir %s" % DATADIR)
179
180        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
181                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
182
183        lines = self._readLog()
184        self.assertLines(['.*Getting new bbftp connexion.*',
185                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
186                          r'.*User TestCaseUser disconnected.*'], lines)
187
188
189    def testHandshake(self):
190        """Verify handshake messages are exchanged.
191        """
192
193        output = self._runClient("dir .", debug=True)
194
195        print >>open('./bbftpc.log', 'w'), output
196
197        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
198
199        lines = self._readLog()
200
201        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
202
203    def testPrivateStr(self):
204        """Verify the private string is sent to server.
205        """
206
207        output = self._runClient("dir .", privatestr="testPrivateStr")
208
209        lines = self._readLog()
210        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
211
212    def testRetr(self):
213        """Try retrieving a file.
214        """
215
216        tmp = tempfile.mktemp('test_bbftpd')
217        output = self._runClient("get %s/foo %s" % (DATADIR, tmp))
218
219        # Check the client output
220        self.assertLines(['get.*nogzip'], output)
221
222        # Check retrieved file
223        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
224        os.remove(tmp)
225
226        # Check log
227        lines = self._readLog()
228        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
229
230    def testStore(self):
231        """Try storing a file.
232        """
233
234
235        src = '%s/bar' % (DATADIR)
236        dest = '%s/new_bar' % (DATADIR)
237        os.system('cp %s %s' % (src, dest))
238        output = self._runClient("put %s %s" % (src, dest))
239
240        # Check the client output
241        self.assertLines(['put .* nogzip'], output)
242
243
244        # Check sent file
245        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
246        os.remove(dest)
247
248        # Check log
249        lines = self._readLog()
250        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
251
252
253
254class AuthFail(BaseFixture):
255    authContext = TestFailAuthHandler()
256
257    def test(self):
258        """Fail authorisation
259        """
260
261        output = self._runClient("dir .", debug=True)
262
263        lines = self._readLog()
264
265        self.assertLines(['.*Error while private authentication.*'], output)
266        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
267
268
269class Authz(BaseFixture):
270    authContext = TestPermAuthHandler()
271
272    def setUp(self):
273        super(Authz, self).setUp()
274
275        # Set modes on test files
276        os.chmod('%s/bar' % (DATADIR), 0640)
277        os.chmod('%s/foo' % (DATADIR), 0655)
278
279    def testOK(self):
280        """Test dir of a readable file.
281        """
282
283        output = self._runClient("stat %s/foo" % (DATADIR), debug=True)
284
285        lines = self._readLog()
286
287
288        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
289        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK',
290                          '.*TestPermAuthzHandler.*msgcode = %s.*' % hex(server.MSG.STAT)], lines)
291
292    def testFail(self):
293        """Test dir of a non-readable file.
294        """
295
296        output = self._runClient("stat %s/bar" % (DATADIR), debug=True)
297
298        lines = self._readLog()
299
300        self.assertLines(['.* ndg_authz_control: AuthorisationFailure.* no read perms',
301                          'stat /.*/bar FAILED'], output)
302        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.* TestPermAuthzHandler FAIL',
303                          '.* ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
304
305
306class PythonClientAuthOK(AuthOK):
307    """Repeat AuthOK tests with a python-embedded client.
308    """
309
310    def testMultiCommand(self):
311        """Execute more than one command per connection.
312        """
313        output = self._runClient(["dir %s" % DATADIR, "stat %s/bar" % DATADIR])
314
315        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
316                          r' f .*/foo', r' f .*/bar', r' f .*/baz',
317                          r'stat .*'], output)
318
319        lines = self._readLog()
320        self.assertLines(['.*Getting new bbftp connexion.*',
321                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
322                          r'.*User TestCaseUser disconnected.*',
323                          r'.*: Receiving MSG_STAT.*'], lines)
324
325
326
327
328    def _runClient(self, cmds, debug=False, user="testcase", privatestr=None):
329        """Run the client.
330        """
331
332        if type(cmds) == type(''):
333            cmd_args = ['-e', repr(cmds)]
334        else:
335            cmd_args = []
336            for cmd in cmds:
337                cmd_args += ['-e', repr(cmd)]
338
339        if debug:
340            f = "-d"
341        else:
342            f = "-m"
343
344        args = cmd_args + [f, '-u', user,
345                '-r', '1']
346
347        if privatestr != None:
348            args += ['-P', privatestr]
349
350        args.append('localhost')
351
352        fh = os.popen('%s/test/runclient.py %s' % (HOME, ' '.join(args)))
353        output = fh.read()
354        print >>open('./bbftpc.log', 'w'), output
355
356        return output
357
358if __name__ == '__main__':
359    unittest.main()
Note: See TracBrowser for help on using the repository browser.