Changeset 1104 for TI05-delivery/trunk
- Timestamp:
- 06/06/06 16:35:10 (15 years ago)
- Location:
- TI05-delivery/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
TI05-delivery/trunk/src/bbftp-server-3.2.0/bbftpd/bbftpd_private.c
r1064 r1104 55 55 #include <version.h> 56 56 57 #ifdef NDG_AUTH58 #include <signal.h>59 #endif // NDG_AUTH60 61 57 extern int incontrolsock ; 62 58 extern int outcontrolsock ; -
TI05-delivery/trunk/test/test_embedded.py
r1100 r1104 54 54 return AuthzContext(msg, "TestCaseUser") 55 55 56 class EmbeddedServerTestCase(unittest.TestCase): 57 """Test the bbftpd module. 58 """ 59 56 57 class FailingAuthContext(AuthContext): 58 def authorise(self): 59 az = super(FailingAuthContext).authorise() 60 return None 61 62 63 64 65 class BaseFixture(unittest.TestCase): 60 66 def setUp(self): 61 67 # We want to mark the beginning of this test case in syslog … … 70 76 syslog.closelog() 71 77 72 def testStartup(self):73 lines = self._readSyslog()74 # Give syslog time to flush it's logs.75 time.sleep(1)76 self.assert_(self._findLines(['.*Starting bbftpd'], lines))77 78 def testDir(self):79 """Try connecting the client and listing a directory.80 """81 82 83 fh = self._runClient("dir %s" % DATADIR)84 output = fh.read()85 86 87 self.assert_(self._findLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.',88 r' f .*/foo', r' f .*/bar', r' f .*/baz'], output))89 90 lines = self._readSyslog()91 self.assert_(self._findLines(['.*Getting new bbftp connexion.*',92 r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*',93 r'.*User TestCaseUser disconnected.*'], lines))94 95 96 def testHandshake(self):97 """Verify handshake messages are exchanged.98 """99 100 fh = self._runClient("dir .", debug=True)101 output = fh.read()102 103 self.assert_(self._findLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output))104 105 lines = self._readSyslog()106 self.assert_(self._findLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines))107 108 def testPrivateStr(self):109 """Verify the private string is sent to server.110 """111 112 fh = self._runClient("dir .", privatestr="testPrivateStr")113 output = fh.read()114 115 lines = self._readSyslog()116 self.assert_(self._findLines(['.*AuthContext received privatestr: testPrivateStr'], lines))117 118 def testRetr(self):119 """Try retrieving a file.120 """121 122 tmp = tempfile.mktemp('test_bbftpd')123 fh = self._runClient("get %s/foo %s" % (DATADIR, tmp))124 125 # Check the client output126 output = fh.read()127 self.assert_(self._findLines(['get.*nogzip'], output))128 129 # Check retrieved file130 self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0)131 os.remove(tmp)132 133 # Check syslog134 lines = self._readSyslog()135 self.assert_(self._findLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines))136 137 def testStore(self):138 """Try storing a file.139 """140 141 142 src = '%s/bar' % (DATADIR)143 dest = '%s/new_bar' % (DATADIR)144 os.system('cp %s %s' % (src, dest))145 fh = self._runClient("put %s %s" % (src, dest))146 147 # Check the client output148 output = fh.read()149 self.assert_(self._findLines(['put .* nogzip'], output))150 151 152 # Check sent file153 self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0)154 os.remove(dest)155 156 # Check syslog157 lines = self._readSyslog()158 self.assert_(self._findLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines))159 160 161 78 #---------------------------------------------------------------------------------- 162 79 163 80 164 def _startServer(self): 81 def _startServer(self, authContext=None): 82 if not authContext: 83 authContext = AuthContext() 165 84 # Start the server and store it's PID 166 self.authContext = AuthContext() 167 self.pid = bbftpd.run(self.authContext, ['-l', 'DEBUG']) 85 self.pid = bbftpd.run(authContext, ['-l', 'DEBUG']) 168 86 169 87 def _stopServer(self): … … 187 105 else: 188 106 p = "-P %s" % repr(privatestr) 189 190 fh = os.popen('%s %s %s -u %s -e %s localhost' % (BBFTP, f, p, user, repr(cmd))) 107 108 # This is ugly but I need to redirect stderr to stdout 109 fh = os.popen('sh -c "%s %s %s -u %s -r 1 -e \'%s\' localhost" 2>&1 ' % (BBFTP, f, p, user, cmd)) 191 110 return fh 192 193 111 194 112 def _readSyslog(self, logfile="/var/log/bbftpd/bbftpd.log"): … … 231 149 232 150 151 def assertLines(self, lines, string): 152 """Assert that there is a line in string matching each regular expression in lines. 153 """ 154 155 for line in lines: 156 self.assert_(re.search('^%s$' % line, string, re.M)) 157 158 159 160 class EmbeddedServerTestCase(BaseFixture): 161 """Test the bbftpd module. 162 """ 163 164 def testStartup(self): 165 lines = self._readSyslog() 166 # Give syslog time to flush it's logs. 167 time.sleep(1) 168 self.assertLines(['.*Starting bbftpd'], lines) 169 170 def testDir(self): 171 """Try connecting the client and listing a directory. 172 """ 173 174 175 fh = self._runClient("dir %s" % DATADIR) 176 output = fh.read() 177 178 179 self.assertLines([r'dir .*/data', r' d .*/\.', r' d .*/\.\.', 180 r' f .*/foo', r' f .*/bar', r' f .*/baz'], output) 181 182 lines = self._readSyslog() 183 self.assertLines(['.*Getting new bbftp connexion.*', 184 r'.*Authz: MSG_LIST_V2 .*/test/data/\*.*', 185 r'.*User TestCaseUser disconnected.*'], lines) 186 187 188 def testHandshake(self): 189 """Verify handshake messages are exchanged. 190 """ 191 192 fh = self._runClient("dir .", debug=True) 193 output = fh.read() 194 195 self.assertLines(['Received Auth handshake: NDG-Delivery-server %s' % VERSION], output) 196 197 lines = self._readSyslog() 198 self.assertLines(['.*AuthContext received Auth message: NDG-Delivery-client %s' % VERSION], lines) 199 200 def testPrivateStr(self): 201 """Verify the private string is sent to server. 202 """ 203 204 fh = self._runClient("dir .", privatestr="testPrivateStr") 205 output = fh.read() 206 207 lines = self._readSyslog() 208 self.assertLines(['.*AuthContext received privatestr: testPrivateStr'], lines) 209 210 def testRetr(self): 211 """Try retrieving a file. 212 """ 213 214 tmp = tempfile.mktemp('test_bbftpd') 215 fh = self._runClient("get %s/foo %s" % (DATADIR, tmp)) 216 217 # Check the client output 218 output = fh.read() 219 self.assertLines(['get.*nogzip'], output) 220 221 # Check retrieved file 222 self.assert_(os.system('diff --brief %s/foo %s' % (DATADIR, tmp)) == 0) 223 os.remove(tmp) 224 225 # Check syslog 226 lines = self._readSyslog() 227 self.assertLines(['.*Authz: RETR .*/foo', '.*GET TestCaseUser .*/foo.*'], lines) 228 229 def testStore(self): 230 """Try storing a file. 231 """ 232 233 234 src = '%s/bar' % (DATADIR) 235 dest = '%s/new_bar' % (DATADIR) 236 os.system('cp %s %s' % (src, dest)) 237 fh = self._runClient("put %s %s" % (src, dest)) 238 239 # Check the client output 240 output = fh.read() 241 self.assertLines(['put .* nogzip'], output) 242 243 244 # Check sent file 245 self.assert_(os.system('diff --brief %s %s' % (dest, src)) == 0) 246 os.remove(dest) 247 248 # Check syslog 249 lines = self._readSyslog() 250 self.assertLines(['.*Authz: STORE .*/new_bar', '.*PUT TestCaseUser .*/new_bar.*'], lines) 251 252 253 254 class AuthFailure(BaseFixture): 255 def setUp(self): 256 # We want to mark the beginning of this test case in syslog 257 syslog.openlog('test_embedded.py', 0, syslog.LOG_LOCAL0) 258 syslog.syslog(syslog.LOG_DEBUG, 'Starting EmbeddedServerTestCase') 259 260 self._startServer(authContext=FailingAuthContext()) 261 262 def test(self): 263 """Fail authorisation 264 """ 265 266 fh = self._runClient("dir .", debug=True) 267 output = fh.read() 268 269 lines = self._readSyslog() 270 271 self.assertLines(['.*Error while private authentication.*'], output) 272 self.assertLines(['.*bbftpd_private_auth failed.*'], lines) 273 233 274 if __name__ == '__main__': 234 275 unittest.main()
Note: See TracChangeset
for help on using the changeset viewer.