source: TI05-delivery/trunk/test/test_embedded.py @ 1442

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI05-delivery/trunk/test/test_embedded.py@1442
Revision 1442, 9.9 KB checked in by spascoe, 15 years ago (diff)

Small change.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""
3Tests for the bbftp daemon when embedded in Python.
4
5@author: Stephen Pascoe
6@version: $Id$
7"""
8
9import unittest
10import sys, os, signal, time, stat
11from glob import glob
12import re, tempfile, getopt
13
14HOME = os.path.abspath(os.getenv('NDG_DELIVERY_HOME', os.curdir))
15BBFTP = glob('%s/src/bbftp-client*/bbftpc/bbftp' % HOME)[0]
16DATADIR = '%s/test/data' % HOME
17VERSION = open('%s/VERSION' % HOME).read()
18NDG_MESSAGE_LEN = 256
19
20NDG_HANDSHAKE = "NDG-Delivery-server %s" % VERSION
21
22
23import ndg.delivery.server.pybbftp as server
24import traceback
25
26
27class TestAuthHandler(server.AuthHandler):
28   
29    def authenticate(self):
30
31        msg = self.recv()
32        self.log(server.LOG_DEBUG, 'AuthContext received Auth message: %s' % msg)
33               
34        self.send(NDG_HANDSHAKE)
35
36        privatestr = self.recv()
37        self.log(server.LOG_DEBUG, "AuthContext received privatestr: %s" % privatestr)
38       
39        return self.makeAuthzHandler(msg, "TestCaseUser")
40
41    def makeAuthzHandler(self, msg, user):
42        return TestAuthzHandler(msg, "TestCaseUser")
43
44
45class TestAuthzHandler(server.LiberalAuthzHandler):
46    def __init__(self, version, username):
47        super(TestAuthzHandler, self).__init__(username)
48        self.version = version
49
50
51
52
53class TestFailAuthHandler(server.AuthHandler):
54    def authorise(self):
55        raise server.AuthenticationFailure, "TestFailAuthHandler"
56
57
58class TestPermAuthHandler(TestAuthHandler):
59    def makeAuthzHandler(self, msg, user):
60        return TestPermAuthzHandler(msg, "TestCaseUser")
61
62class TestPermAuthzHandler(TestAuthzHandler):
63    """Check's the path's other read permission and authorises accordingly.
64    """
65
66    def authzControl(self, msgcode, transferopt, path):
67        self.log(server.LOG_DEBUG, 'TestPermAuthzHandler.authzControl: msgcode = %s' % hex(msgcode))
68        return self.authzPath(path)
69
70    def authzRetr(self, path):
71        return self.authzPath(path)
72
73    def authzStore(self, path):
74        return self.authzPath(path)
75
76    def authzPath(self, path):
77        mode = stat.S_IMODE(os.stat(path)[stat.ST_MODE])
78        if (mode & 0004):
79            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler OK')
80            return True
81        else:
82            self.log(server.LOG_DEBUG, 'TestPermAuthzHandler FAIL')
83            raise server.AuthorisationFailure, "TestPermAuthzHandler: no read perms"
84
85
86class BaseFixture(unittest.TestCase):
87
88    authContext = TestAuthHandler()
89
90    def setUp(self):
91        self._clearLog()
92        self._startServer()
93
94    def tearDown(self):
95        self._stopServer()
96
97    #----------------------------------------------------------------------------------
98
99
100    def _startServer(self):
101        # Start the server and store it's PID
102        self.pid = server.start(self.authContext, ['-l', 'DEBUG'])
103
104    def _stopServer(self):
105        # Stop the server process
106        os.kill(self.pid, signal.SIGTERM)
107        os.waitpid(self.pid, 0)
108
109
110    def _runClient(self, cmd, debug=False, user="testcase", privatestr=None):
111        """Run the client.
112        """
113
114        if debug:
115            f = "-d"
116        else:
117            f = "-m"
118
119        if privatestr == None:
120            p = ""
121        else:
122            p = "-P %s" % repr(privatestr)
123
124        # This is ugly but I need to redirect stderr to stdout
125        fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd))
126
127        output = fh.read()
128        print >>open('./bbftpc.log', 'w'), output
129
130        return output
131
132
133    def _clearLog(self, logfile="./bbftpd.log"):
134        if os.path.exists(logfile):
135            os.remove(logfile)
136
137    def _readLog(self, logfile="./bbftpd.log"):
138        return open(logfile).read()
139   
140
141
142    def _findLines(self, lines, string):
143        """Look for lines matching each regular expression in lines.
144       
145        @return: True if all lines are found, else False
146        """
147       
148        for line in lines:
149            if not re.search('^%s$' % line, string, re.M):
150                return False
151           
152        return True
153
154
155    def assertLines(self, lines, string):
156        """Assert that there is a line in string matching each regular expression in lines.
157        """
158
159        for line in lines:
160            self.assert_(re.search('^%s$' % line, string, re.M))
161
162
163
164class AuthOK(BaseFixture):
165    """Test the bbftpd module.
166    """
167   
168    def testStartup(self):
169        lines = self._readLog()
170        self.assertLines(['.*Starting bbftpd'], lines)
171
172    def testDir(self):
173        """Try connecting the client and listing a directory.
174        """
175
176        output = self._runClient("dir %s" % DATADIR)
177
178        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
179                        r' f .*/foo', r' f .*/bar', r' f .*/baz'], output)
180
181        lines = self._readLog()
182        self.assertLines(['.*Getting new bbftp connexion.*',
183                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
184                          r'.*User TestCaseUser disconnected.*'], lines)
185
186
187    def testHandshake(self):
188        """Verify handshake messages are exchanged.
189        """
190
191        output = self._runClient("dir .", debug=True)
192
193        print >>open('./bbftpc.log', 'w'), output
194
195        self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output)
196
197        lines = self._readLog()
198
199        self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines)
200
201    def testPrivateStr(self):
202        """Verify the private string is sent to server.
203        """
204
205        output = self._runClient("dir .", privatestr="testPrivateStr")
206
207        lines = self._readLog()
208        self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines)
209
210    def testRetr(self):
211        """Try retrieving a file.
212        """
213
214        tmp = tempfile.mktemp('test_bbftpd')
215        output = self._runClient("get %s/foo %s" % (DATADIR, tmp))
216
217        # Check the client output
218        self.assertLines(['get.*nogzip'], output)
219
220        # Check retrieved file
221        self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)
222        os.remove(tmp)
223
224        # Check log
225        lines = self._readLog()
226        self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines)
227
228    def testStore(self):
229        """Try storing a file.
230        """
231
232
233        src = '%s/bar' % (DATADIR)
234        dest = '%s/new_bar' % (DATADIR)
235        os.system('cp %s %s' % (src, dest))
236        output = self._runClient("put %s %s" % (src, dest))
237
238        # Check the client output
239        self.assertLines(['put .* nogzip'], output)
240
241
242        # Check sent file
243        self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)
244        os.remove(dest)
245
246        # Check log
247        lines = self._readLog()
248        self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines)
249
250
251
252class AuthFail(BaseFixture):
253    authContext = TestFailAuthHandler()
254
255    def test(self):
256        """Fail authorisation
257        """
258
259        output = self._runClient("dir .", debug=True)
260
261        lines = self._readLog()
262
263        self.assertLines(['.*Error while private authentication.*'], output)
264        self.assertLines(['.*bbftpd_private_auth failed.*'], lines)
265
266
267class Authz(BaseFixture):
268    authContext = TestPermAuthHandler()
269
270    def setUp(self):
271        super(Authz, self).setUp()
272
273        # Set modes on test files
274        os.chmod('%s/bar' % (DATADIR), 0640)
275        os.chmod('%s/foo' % (DATADIR), 0655)
276
277    def testOK(self):
278        """Test dir of a readable file.
279        """
280
281        output = self._runClient("stat %s/foo" % (DATADIR), debug=True)
282
283        lines = self._readLog()
284
285
286        self.assertLines(['Connection and authentication correct', 'stat /.*/foo OK'], output)
287        self.assertLines(['.*Authz: MSG_STAT .*/foo', '.*TestPermAuthzHandler OK',
288                          '.*TestPermAuthzHandler.*msgcode = %s.*' % hex(server.MSG.STAT)], lines)
289
290    def testFail(self):
291        """Test dir of a non-readable file.
292        """
293
294        output = self._runClient("stat %s/bar" % (DATADIR), debug=True)
295
296        lines = self._readLog()
297
298        self.assertLines(['.* ndg_authz_control: AuthorisationFailure.* no read perms',
299                          'stat /.*/bar FAILED'], output)
300        self.assertLines(['.*Authz: MSG_STAT .*/bar', '.* TestPermAuthzHandler FAIL',
301                          '.* ndg_authz_control: AuthorisationFailure.* no read perms'], lines)
302
303
304class PythonClientAuthOK(AuthOK):
305    """Repeat AuthOK tests with a python-embedded client.
306    """
307
308    def testMultiCommand(self):
309        """Execute more than one command per connection.
310        """
311        output = self._runClient(["dir %s" % DATADIR, "stat %s/bar" % DATADIR])
312
313        self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',
314                          r' f .*/foo', r' f .*/bar', r' f .*/baz',
315                          r'stat .*'], output)
316
317        lines = self._readLog()
318        self.assertLines(['.*Getting new bbftp connexion.*',
319                          r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',
320                          r'.*User TestCaseUser disconnected.*',
321                          r'.*: Receiving MSG_STAT.*'], lines)
322
323
324
325
326    def _runClient(self, cmds, debug=False, user="testcase", privatestr=None):
327        """Run the client.
328        """
329
330        if type(cmds) == type(''):
331            cmd_args = ['-e', repr(cmds)]
332        else:
333            cmd_args = []
334            for cmd in cmds:
335                cmd_args += ['-e', repr(cmd)]
336
337        if debug:
338            f = "-d"
339        else:
340            f = "-m"
341
342        args = cmd_args + [f, '-u', user,
343                '-r', '1']
344
345        if privatestr != None:
346            args += ['-P', privatestr]
347
348        args.append('localhost')
349
350        fh = os.popen('%s/test/runclient.py %s' % (HOME, ' '.join(args)))
351        output = fh.read()
352        print >>open('./bbftpc.log', 'w'), output
353
354        return output
355
356if __name__ == '__main__':
357    unittest.main()
Note: See TracBrowser for help on using the repository browser.