Changeset 4520 for TI12-security/trunk
- Timestamp:
- 01/12/08 15:39:41 (12 years ago)
- Location:
- TI12-security/trunk/python
- Files:
-
- 8 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
TI12-security/trunk/python/ndg.security.server/ndg/security/server/sessionmanager.py
r4500 r4520 642 642 # Look for a session corresponding to this ID 643 643 if sessID and userDN: 644 raise SessionManagerError('Only "SessID" or "userDN" keywords may be'645 'set')644 raise SessionManagerError('Only "SessID" or "userDN" keywords may ' 645 'be set') 646 646 elif sessID: 647 647 if sessID in self.__sessDict: … … 650 650 else: 651 651 # User session not found with given ID 652 log.info("No user session found matching input ID = %s" % \ 653 sessID) 652 log.info("No user session found matching input ID = %s"%sessID) 654 653 return False 655 654 … … 669 668 else: 670 669 # User session not found with given proxy cert 671 log.info("No user session found matching input userDN = %s" % \670 log.info("No user session found matching input userDN = %s" % 672 671 userDN) 673 672 return False … … 1075 1074 userX509Cert=userX509Cert) 1076 1075 1077 # The user's Credential Wallet carries out a ttribute request to the1076 # The user's Credential Wallet carries out an attribute request to the 1078 1077 # Attribute Authority 1079 try: 1080 attCert=userSess.credentialWallet.getAttCert(**credentialWalletKw) 1081 return attCert, None, [] 1082 1083 except CredentialWalletAttributeRequestDenied, e: 1084 # Exception object contains a list of attribute certificates 1085 # which could be used to re-try to get authorisation via a mapped 1086 # certificate 1087 return None, str(e), e.extAttCertList 1078 attCert = userSess.credentialWallet.getAttCert(**credentialWalletKw) 1079 return attCert 1080 1088 1081 1089 1082 def auditCredentialRepository(self): -
TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/utils/attributeauthorityclient.py
r4515 r4520 1 """NDG Security - client interface classes to Session Manager 2 3 Make requests for authentication and authorisation 4 5 NERC Data Grid Project 6 7 This software may be distributed under the terms of the Q Public License, 8 version 1.0 or later. 9 """ 10 __author__ = "P J Kershaw" 11 __date__ = "27/11/08" 12 __copyright__ = "(C) 2008 STFC & NERC" 13 __contact__ = "Philip.Kershaw@stfc.ac.uk" 14 __revision__ = "$Id$" 15 import logging 16 log = logging.getLogger(__name__) 1 17 2 18 class WSGIAttributeAuthorityClient(object): 19 """Client interface to Attribute Authority for WSGI based applications 3 20 4 environKey = "ndg.security.server.attributeauthority.AttributeAuthority" 21 This class wraps the SOAP based web service client and alternate direct 22 access to a Attribute Authority instance in the same code stack available 23 via an environ keyword 24 """ 25 26 environKey = "ndg.security.server.wsgi.attributeAuthorityFilter" 5 27 6 28 def __init__(self, environKey=None, environ={}, **soapClientKw): 7 """"""8 29 9 30 log.debug("WSGIAttributeAuthorityClient.__init__ ...") -
TI12-security/trunk/python/ndg.security.server/ndg/security/server/wsgi/utils/sessionmanagerclient.py
r4515 r4520 1 """NDG Security client - client interface classes to Session Manager1 """NDG Security 2 2 3 Make requests for authentication and authorisation 3 Client interface to Session Manager for WSGI based applications 4 4 5 5 NERC Data Grid Project … … 9 9 """ 10 10 __author__ = "P J Kershaw" 11 __date__ = "2 4/04/06"12 __copyright__ = "(C) 200 7STFC & NERC"11 __date__ = "27/11/08" 12 __copyright__ = "(C) 2008 STFC & NERC" 13 13 __contact__ = "Philip.Kershaw@stfc.ac.uk" 14 __revision__ = "$Id :sessionmanager.py 4373 2008-10-29 09:54:39Z pjkersha$"14 __revision__ = "$Id$" 15 15 16 16 import logging … … 33 33 34 34 This class wraps the SOAP based web service client and alternate access to 35 a Session Manager in the same code stack via an environ keyword 35 a Session Manager instance in the same code stack available via an environ 36 keyword 36 37 """ 37 environKey = "ndg.security.server. sessionmanager.SessionManager"38 environKey = "ndg.security.server.wsgi.sessionManagerFilter" 38 39 39 40 def __init__(self, environKey=None, environ={}, **soapClientKw): … … 50 51 self._soapClient = SessionManagerClient(**soapClientKw) 51 52 52 _ sessionManagerInEnviron = lambda self: self._environKey in self._environ53 _refInEnviron = lambda self: self._environKey in self._environ 53 54 54 55 # Define as property for convenient call syntax 55 sessionManagerInEnviron = property(fget=_sessionManagerInEnviron, 56 doc="return True if a Session Manager " 57 "instance is available in " 58 "WSGI environ") 56 refInEnviron = property(fget=_refInEnviron, 57 doc="return True if a Session Manager instance is " 58 "available in WSGI environ") 59 59 60 60 _getRef = lambda self:self._environ[self._environKey].serviceSOAPBinding.sm … … 89 89 """Delete an existing user session from the Session Manager 90 90 91 disconnect([userX509Cert=c]|[sessID=i]) 92 93 @type userX509Cert: string 94 @param userX509Cert: user's certificate used to identifier which session 95 to disconnect. This arg is not needed if the message is signed with 96 the user cert or if sessID is set. 97 98 @type sessID: string 99 @param sessID: session ID. Input this as an alternative to userX509Cert 100 This arg is not needed if the message is signed with the user cert or 101 if userX509Cert keyword is.""" 91 @type **kw: dict 92 @param **kw: disconnect keywords applicable to 93 ndg.security.server.sessionmanager.SessionManager.getSessionStatus and 94 ndg.security.common.sessionmanager.SessionManagerClient.getSessionStatus 95 the SOAP client""" 102 96 103 # Make connection 104 self._soapClient.disconnect(**kw) 97 # Modify keywords according to correct interface for server side / 98 # SOAP client 99 if self.refInEnviron: 100 if 'userDN' in kw: 101 log.warning('Removing keyword "userDN": this is not supported ' 102 'for calls to ndg.security.server.sessionmanager.' 103 'SessionManager.deleteUserSession') 104 kw.pop('userX509Cert', None) 105 106 self.ref.deleteUserSession(**kw) 107 else: 108 if 'userX509Cert' in kw: 109 kw['userDN'] = kw.pop('userX509Cert').dn 110 111 self._soapClient.disconnect(**kw) 105 112 106 113 107 def getSessionStatus(self, userDN=None, sessID=None):114 def getSessionStatus(self, **kw): 108 115 """Check for the existence of a session with a given 109 116 session ID / user certificate Distinguished Name 110 111 disconnect([sessID=id]|[userDN=dn])112 113 @type userX509Cert: string114 @param userX509Cert: user's certificate used to identifier which session115 to disconnect. This arg is not needed if the message is signed with116 the user cert or if sessID is set.117 117 118 @type sessID: string 119 @param sessID: session ID. Input this as an alternative to userX509Cert 120 This arg is not needed if the message is signed with the user cert or 121 if userX509Cert keyword is.""" 118 @type **kw: dict 119 @param **kw: disconnect keywords applicable to 120 ndg.security.server.sessionmanager.SessionManager.getSessionStatus and 121 ndg.security.common.sessionmanager.SessionManagerClient.getSessionStatus 122 the SOAP client""" 122 123 123 # Make connection 124 return self._soapClient.getSessionStatus(**kw) 124 if self.refInEnviron: 125 return self.ref.getSessionStatus(**kw) 126 else: 127 return self._soapClient.getSessionStatus(**kw) 128 125 129 126 130 … … 130 134 user's credential wallet held by the session manager. 131 135 136 @type **kw: dict 137 @param **kw: disconnect keywords applicable to 138 ndg.security.server.sessionmanager.SessionManager.getAttCert and 139 ndg.security.common.sessionmanager.SessionManagerClient.getAttCert 140 the SOAP client 132 141 """ 142 133 143 if self.refInEnviron: 134 144 # Connect to local instance … … 145 155 "set and no reference is available in environ") 146 156 147 return self.ref.getAttCert( username=username,**kw)157 return self.ref.getAttCert(**kw) 148 158 else: 149 159 # Filter out keywords which apply to a Session Manager local 150 160 # instance call 151 kw.pop('refreshAttCert', None) 152 kw.pop('attCertRefreshElapse', None) 161 if 'username' in kw: 162 kw.pop('username') 163 log.warning('Trying call via SOAP interface: ' 164 'removing the "username" keyword ' 165 'ndg.security.common.sessionmanager.' 166 'SessionManagerClient.getAttCert doesn\'t support ' 167 'this keyword') 168 169 if 'refreshAttCert' in kw: 170 kw.pop('refreshAttCert') 171 log.warning('Trying call via SOAP interface: ' 172 'removing the "refreshAttCert" keyword ' 173 'ndg.security.common.sessionmanager.' 174 'SessionManagerClient.getAttCert doesn\'t support ' 175 'this keyword') 176 177 if 'attCertRefreshElapse' in kw: 178 kw.pop('attCertRefreshElapse') 179 log.warning('Trying call via SOAP interface: ' 180 'removing the "attCertRefreshElapse" keyword ' 181 'ndg.security.common.sessionmanager.' 182 'SessionManagerClient.getAttCert doesn\'t support ' 183 'this keyword') 153 184 154 return self._soapClient.getAttCert(username=username,**kw)185 return self._soapClient.getAttCert(**kw) -
TI12-security/trunk/python/ndg.security.server/ndg/security/server/zsi/sessionmanager/__init__.py
r4480 r4520 24 24 25 25 from ndg.security.server.sessionmanager import SessionManager 26 26 from ndg.security.common.credentialwallet import \ 27 CredentialWalletAttributeRequestDenied 27 28 from ndg.security.common.wssecurity.dom import SignatureHandler 28 29 from ndg.security.common.X509 import X509Cert, X509CertRead … … 185 186 # X.509 Cert used in signature is preferred over userX509Cert input 186 187 # element - userX509Cert may have been omitted. 187 result = self.sm.getAttCert( 188 try: 189 attCert = self.sm.getAttCert( 188 190 userX509Cert=userX509Cert or request.UserX509Cert, 189 191 sessID=request.SessID, … … 195 197 extAttCertList=request.ExtAttCert, 196 198 extTrustedHostList=request.ExtTrustedHost) 197 if result[0]: 198 response.AttCert = result[0].toString() 199 200 response.Msg, response.ExtAttCertOut = result[1:] 201 202 return response 199 response.AttCert = attCert.toString() 200 201 except CredentialWalletAttributeRequestDenied, e: 202 # Exception object contains a list of attribute certificates 203 # which could be used to re-try to get authorisation via a mapped 204 # certificate 205 response.Msg = str(e) 206 response.ExtAttCertOut = e.extAttCertList 207 208 return response -
TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/serverapp.py
r4513 r4520 19 19 from ndg.security.server.wsgi.utils.sessionmanagerclient import \ 20 20 WSGISessionManagerClient 21 21 22 23 class HTTPBasicAuthentication(object): 24 '''Enable Authkit based HTTP Basic Authentication for test methods''' 25 def __init__(self): 26 self._userIn = UserIn([]) 27 28 def __call__(self, environ, username, password): 29 """validation function""" 30 try: 31 client = WSGISessionManagerClient(environ=environ) 32 res = client.connect(username, passphrase=password) 33 34 if username not in self._userIn.users: 35 self._userIn.users += [username] 36 37 # Keep a reference to the session ID for test purposes 38 environ[client.environKey+'.user'] = res[-1] 39 40 except Exception, e: 41 return False 42 else: 43 return True 44 22 45 class InfoApp(object): 23 46 method = { 24 47 "/": 'default', 25 "/test_connect": "test_connect" 48 "/test_connect": "test_connect", 49 "/test_getSessionStatus": "test_getSessionStatus", 50 "/test_disconnect": "test_disconnect", 51 "/test_getAttCert": "test_getAttCert" 26 52 } 53 httpBasicAuthentication = HTTPBasicAuthentication() 27 54 28 55 def __call__(self, environ, start_response): … … 40 67 return "NDG Security Combined Services Unit Tests" 41 68 69 @authorize(httpBasicAuthentication._userIn) 42 70 def test_connect(self, environ, start_response): 43 44 71 start_response('200 OK', [('Content-type', 'text/plain')]) 45 72 return "test_connect succeeded" 73 74 @authorize(httpBasicAuthentication._userIn) 75 def test_getSessionStatus(self, environ, start_response): 76 client = WSGISessionManagerClient(environ=environ) 77 stat=client.getSessionStatus(sessID=environ[client.environKey+'.user']) 78 start_response('200 OK', [('Content-type', 'text/xml')]) 79 return "test_getSessionStatus succeeded. Response = %s" % stat 46 80 47 def test_getAttributeCertificate(self, environ, start_response): 81 @authorize(httpBasicAuthentication._userIn) 82 def test_disconnect(self, environ, start_response): 48 83 client = WSGISessionManagerClient(environ=environ) 49 attCert = client.getAttCert() 84 client.disconnect(sessID=environ[client.environKey+'.user']) 85 86 # Re-initialise user authentication 87 InfoApp.httpBasicAuthentication._userIn.users = [] 88 start_response('200 OK', [('Content-type', 'text/plain')]) 89 return "test_disconnect succeeded." 90 91 @authorize(httpBasicAuthentication._userIn) 92 def test_getAttCert(self, environ, start_response): 93 client = WSGISessionManagerClient(environ=environ) 94 attCert = client.getAttCert(sessID=environ[client.environKey+'.user']) 50 95 start_response('200 OK', [('Content-type', 'text/xml')]) 51 return attCert 52 53 def valid(environ, username, password): 54 """validation function""" 55 try: 56 client = WSGISessionManagerClient(environ=environ) 57 res = client.connect(username, passphrase=password) 58 except Exception, e: 59 return False 60 else: 61 return True 96 return str(attCert) 62 97 63 98 def app_factory(global_config, **local_conf): -
TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/services.ini
r4513 r4520 22 22 wsseCfgFileSection = WS-Security 23 23 24 #______________________________________________________________________________ 24 25 # Attribute Authority settings 25 26 # 'name' setting MUST agree with map config file 'thisHost' name attribute … … 58 59 attributeAuthority.caCertFilePathList: $NDGSEC_COMBINED_SRVS_UNITTEST_DIR/ca/ndg-test-ca.crt 59 60 60 61 #______________________________________________________________________________ 61 62 # Session Manager specific settings - commented out settings will take their 62 63 # default settings. To override the defaults uncomment and set as required. … … 100 101 # values should be delimited by a space 101 102 sessionManager.credentialWallet.wssecurity.caCertFilePathList: $NDGSEC_COMBINED_SRVS_UNITTEST_DIR/ca/ndg-test-ca.crt 103 102 104 # Signature of an outbound message 103 105 # … … 132 134 # Authentication service properties 133 135 sessionManager.authNService.moduleFilePath: 134 sessionManager.authNService.moduleName: ndg.security.test.combinedservices.sessionmanager.user certauthn135 sessionManager.authNService.className: User CertAuthN136 sessionManager.authNService.moduleName: ndg.security.test.combinedservices.sessionmanager.userx509certauthn 137 sessionManager.authNService.className: UserX509CertAuthN 136 138 137 139 # Specific settings for UserCertAuthN Session Manager authentication plugin … … 165 167 enableWSDLQuery = True 166 168 charset = utf-8 167 filterID = attributeAuthorityFilter169 filterID = ndg.security.server.wsgi.attributeAuthorityFilter 168 170 169 171 [filter:SessionManagerFilter] … … 178 180 enableWSDLQuery = True 179 181 charset = utf-8 180 filterID = ndg.security.server. sessionmanager.SessionManager182 filterID = ndg.security.server.wsgi.sessionManagerFilter 181 183 182 184 [filter:wsseSignatureVerificationFilter] … … 198 200 setup_method=basic 199 201 basic_realm=Test Realm 200 basic_authenticate_function=ndg.security.test.combinedservices.serverapp: valid202 basic_authenticate_function=ndg.security.test.combinedservices.serverapp:InfoApp.httpBasicAuthentication 201 203 202 204 -
TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.cfg
r4513 r4520 41 41 passphrase = testpassword 42 42 43 [test10WSGILocalInstanceGetSessionStatus] 44 url = http://localhost:8000/test_getSessionStatus 45 username = testuser 46 passphrase = testpassword 47 48 [test11WSGILocalInstanceDisconnect] 49 url = http://localhost:8000/test_disconnect 50 username = testuser 51 passphrase = testpassword 52 53 [test12WSGILocalInstanceGetAttCert] 54 url = http://localhost:8000/test_getAttCert 55 username = testuser 56 passphrase = testpassword 57 43 58 [wsse] 44 59 # WS-Security settings for unit test AA clients -
TI12-security/trunk/python/ndg.security.test/ndg/security/test/combinedservices/test_combinedservices.py
r4513 r4520 114 114 115 115 116 def test01Connect(self):117 """test01Connect: Connect as if acting as a browser client -118 a session ID is returned"""119 120 username = self.cfg['test01Connect']['username']121 122 if CombinedServicesTestCase.test01Passphrase is None:123 CombinedServicesTestCase.test01Passphrase = \124 self.cfg['test01Connect'].get('passphrase')125 126 if not CombinedServicesTestCase.test01Passphrase:127 CombinedServicesTestCase.test01Passphrase = getpass.getpass(\128 prompt="\ntest01Connect pass-phrase for user %s: " % username)129 130 self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \131 self.clnt.connect(self.cfg['test01Connect']['username'],132 passphrase=CombinedServicesTestCase.test01Passphrase)133 134 print("User '%s' connected to Session Manager:\n%s" % (username,135 self.sessID))136 137 138 def test02GetSessionStatus(self):139 """test02GetSessionStatus: check a session is alive"""140 print "\n\t" + self.test02GetSessionStatus.__doc__141 142 self.test01Connect()143 assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead"144 145 print("User connected to Session Manager with sessID=%s" % self.sessID)146 147 assert not self.clnt.getSessionStatus(sessID='abc'), \148 "sessID=abc shouldn't exist!"149 150 print "CORRECT: sessID=abc doesn't exist"151 152 153 def test03ConnectNoCreateServerSess(self):154 """test03ConnectNoCreateServerSess: Connect without creating a session -155 sessID should be None. This only indicates that the username/password156 are correct. To be of practical use the AuthNService plugin at157 the Session Manager needs to return X.509 credentials e.g.158 with MyProxy plugin."""159 160 username = self.cfg['test03ConnectNoCreateServerSess']['username']161 162 if CombinedServicesTestCase.test03Passphrase is None:163 CombinedServicesTestCase.test03Passphrase = \164 self.cfg['test03ConnectNoCreateServerSess'].get('passphrase')165 166 if not CombinedServicesTestCase.test03Passphrase:167 prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: "168 CombinedServicesTestCase.test03Passphrase = getpass.getpass(\169 prompt=prompt % username)170 171 userX509Cert, userPriKey,issuingCert, sessID = \172 self.clnt.connect(username,173 passphrase=CombinedServicesTestCase.test03Passphrase,174 createServerSess=False)175 176 # Expect null session ID177 assert(not sessID)178 179 print("Successfully authenticated")180 181 182 def test04DisconnectWithSessID(self):183 """test04DisconnectWithSessID: disconnect as if acting as a browser184 client185 """186 187 print "\n\t" + self.test04DisconnectWithSessID.__doc__188 self.test01Connect()189 190 self.clnt.disconnect(sessID=self.sessID)191 192 print("User disconnected from Session Manager:\n%s" % self.sessID)193 194 195 def test05DisconnectWithUserX509Cert(self):196 """test05DisconnectWithUserX509Cert: Disconnect as a command line client197 """198 199 print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__200 self.test01Connect()201 202 # Use user cert / private key just obtained from connect call for203 # signature generation204 if self.issuingCert:205 self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'206 self.clnt.signatureHandler.signingPriKey = self.userPriKey207 self.clnt.signatureHandler.signingCertChain = (self.issuingCert,208 self.userX509Cert)209 self.clnt.signatureHandler.signingCert = None210 else:211 self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'212 self.clnt.signatureHandler.signingPriKeyPwd = \213 CombinedServicesTestCase.test01Passphrase214 self.clnt.signatureHandler.signingPriKey = self.userPriKey215 self.clnt.signatureHandler.signingCertChain = ()216 self.clnt.signatureHandler.signingCert = self.userX509Cert217 218 # user X.509 cert in signature determines ID of session to delete219 self.clnt.disconnect()220 print("User disconnected from Session Manager:\n%s"%self.userX509Cert)221 222 223 def test06GetAttCertWithSessID(self):224 """test06GetAttCertWithSessID: make an attribute request using225 a session ID as authentication credential"""226 227 print "\n\t" + self.test06GetAttCertWithSessID.__doc__228 thisSection = self.cfg['test06GetAttCertWithSessID']229 self.test01Connect()230 231 attCert = self.clnt.getAttCert(sessID=self.sessID,232 attAuthorityURI=thisSection['aaURI'])233 234 print "Attribute Certificate:\n%s" % attCert235 attCert.filePath = xpdVars(thisSection['acOutFilePath'])236 attCert.write()237 238 239 def test07GetAttCertWithUserX509Cert(self):240 """test07GetAttCertWithUserX509Cert: make an attribute request using241 a user cert as authentication credential"""242 print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__243 self.test01Connect()244 245 if self.issuingCert:246 self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1'247 self.clnt.signatureHandler.signingPriKeyPwd = \248 CombinedServicesTestCase.test01Passphrase249 self.clnt.signatureHandler.signingPriKey = self.userPriKey250 self.clnt.signatureHandler.signingCertChain = (self.issuingCert,251 self.userX509Cert)252 self.clnt.signatureHandler.signingCert = None253 else:254 self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3'255 self.clnt.signatureHandler.signingPriKeyPwd = \256 CombinedServicesTestCase.test01Passphrase257 self.clnt.signatureHandler.signingPriKey = self.userPriKey258 self.clnt.signatureHandler.signingCertChain = ()259 self.clnt.signatureHandler.signingCert = self.userX509Cert260 261 # Request an attribute certificate from an Attribute Authority262 # using the userX509Cert returned from connect()263 264 aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI']265 attCert = self.clnt.getAttCert(attAuthorityURI=aaURI)266 267 print("Attribute Certificate:\n%s" % attCert)268 269 270 def test08GetAttCertFromLocalAttributeAuthority(self):271 """test08GetAttCertFromLocalAttributeAuthority: query the Attribute272 Authority running in the same server instance as the Session Manager"""273 274 print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__275 self.test01Connect()276 277 attCert = self.clnt.getAttCert(sessID=self.sessID)278 279 print "Attribute Certificate:\n%s" % attCert116 # def test01Connect(self): 117 # """test01Connect: Connect as if acting as a browser client - 118 # a session ID is returned""" 119 # 120 # username = self.cfg['test01Connect']['username'] 121 # 122 # if CombinedServicesTestCase.test01Passphrase is None: 123 # CombinedServicesTestCase.test01Passphrase = \ 124 # self.cfg['test01Connect'].get('passphrase') 125 # 126 # if not CombinedServicesTestCase.test01Passphrase: 127 # CombinedServicesTestCase.test01Passphrase = getpass.getpass(\ 128 # prompt="\ntest01Connect pass-phrase for user %s: " % username) 129 # 130 # self.userX509Cert, self.userPriKey, self.issuingCert, self.sessID = \ 131 # self.clnt.connect(self.cfg['test01Connect']['username'], 132 # passphrase=CombinedServicesTestCase.test01Passphrase) 133 # 134 # print("User '%s' connected to Session Manager:\n%s" % (username, 135 # self.sessID)) 136 # 137 # 138 # def test02GetSessionStatus(self): 139 # """test02GetSessionStatus: check a session is alive""" 140 # print "\n\t" + self.test02GetSessionStatus.__doc__ 141 # 142 # self.test01Connect() 143 # assert self.clnt.getSessionStatus(sessID=self.sessID),"Session is dead" 144 # 145 # print("User connected to Session Manager with sessID=%s" % self.sessID) 146 # 147 # assert not self.clnt.getSessionStatus(sessID='abc'), \ 148 # "sessID=abc shouldn't exist!" 149 # 150 # print "CORRECT: sessID=abc doesn't exist" 151 # 152 # 153 # def test03ConnectNoCreateServerSess(self): 154 # """test03ConnectNoCreateServerSess: Connect without creating a session - 155 # sessID should be None. This only indicates that the username/password 156 # are correct. To be of practical use the AuthNService plugin at 157 # the Session Manager needs to return X.509 credentials e.g. 158 # with MyProxy plugin.""" 159 # 160 # username = self.cfg['test03ConnectNoCreateServerSess']['username'] 161 # 162 # if CombinedServicesTestCase.test03Passphrase is None: 163 # CombinedServicesTestCase.test03Passphrase = \ 164 # self.cfg['test03ConnectNoCreateServerSess'].get('passphrase') 165 # 166 # if not CombinedServicesTestCase.test03Passphrase: 167 # prompt="\ntest03ConnectNoCreateServerSess pass-phrase for user %s: " 168 # CombinedServicesTestCase.test03Passphrase = getpass.getpass(\ 169 # prompt=prompt % username) 170 # 171 # userX509Cert, userPriKey,issuingCert, sessID = \ 172 # self.clnt.connect(username, 173 # passphrase=CombinedServicesTestCase.test03Passphrase, 174 # createServerSess=False) 175 # 176 # # Expect null session ID 177 # assert(not sessID) 178 # 179 # print("Successfully authenticated") 180 # 181 # 182 # def test04DisconnectWithSessID(self): 183 # """test04DisconnectWithSessID: disconnect as if acting as a browser 184 # client 185 # """ 186 # 187 # print "\n\t" + self.test04DisconnectWithSessID.__doc__ 188 # self.test01Connect() 189 # 190 # self.clnt.disconnect(sessID=self.sessID) 191 # 192 # print("User disconnected from Session Manager:\n%s" % self.sessID) 193 # 194 # 195 # def test05DisconnectWithUserX509Cert(self): 196 # """test05DisconnectWithUserX509Cert: Disconnect as a command line client 197 # """ 198 # 199 # print "\n\t" + self.test05DisconnectWithUserX509Cert.__doc__ 200 # self.test01Connect() 201 # 202 # # Use user cert / private key just obtained from connect call for 203 # # signature generation 204 # if self.issuingCert: 205 # self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1' 206 # self.clnt.signatureHandler.signingPriKey = self.userPriKey 207 # self.clnt.signatureHandler.signingCertChain = (self.issuingCert, 208 # self.userX509Cert) 209 # self.clnt.signatureHandler.signingCert = None 210 # else: 211 # self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3' 212 # self.clnt.signatureHandler.signingPriKeyPwd = \ 213 # CombinedServicesTestCase.test01Passphrase 214 # self.clnt.signatureHandler.signingPriKey = self.userPriKey 215 # self.clnt.signatureHandler.signingCertChain = () 216 # self.clnt.signatureHandler.signingCert = self.userX509Cert 217 # 218 # # user X.509 cert in signature determines ID of session to delete 219 # self.clnt.disconnect() 220 # print("User disconnected from Session Manager:\n%s"%self.userX509Cert) 221 # 222 # 223 # def test06GetAttCertWithSessID(self): 224 # """test06GetAttCertWithSessID: make an attribute request using 225 # a session ID as authentication credential""" 226 # 227 # print "\n\t" + self.test06GetAttCertWithSessID.__doc__ 228 # thisSection = self.cfg['test06GetAttCertWithSessID'] 229 # self.test01Connect() 230 # 231 # attCert = self.clnt.getAttCert(sessID=self.sessID, 232 # attAuthorityURI=thisSection['aaURI']) 233 # 234 # print "Attribute Certificate:\n%s" % attCert 235 # attCert.filePath = xpdVars(thisSection['acOutFilePath']) 236 # attCert.write() 237 # 238 # 239 # def test07GetAttCertWithUserX509Cert(self): 240 # """test07GetAttCertWithUserX509Cert: make an attribute request using 241 # a user cert as authentication credential""" 242 # print "\n\t" + self.test07GetAttCertWithUserX509Cert.__doc__ 243 # self.test01Connect() 244 # 245 # if self.issuingCert: 246 # self.clnt.signatureHandler.reqBinSecTokValType = 'X509PKIPathv1' 247 # self.clnt.signatureHandler.signingPriKeyPwd = \ 248 # CombinedServicesTestCase.test01Passphrase 249 # self.clnt.signatureHandler.signingPriKey = self.userPriKey 250 # self.clnt.signatureHandler.signingCertChain = (self.issuingCert, 251 # self.userX509Cert) 252 # self.clnt.signatureHandler.signingCert = None 253 # else: 254 # self.clnt.signatureHandler.reqBinSecTokValType = 'X509v3' 255 # self.clnt.signatureHandler.signingPriKeyPwd = \ 256 # CombinedServicesTestCase.test01Passphrase 257 # self.clnt.signatureHandler.signingPriKey = self.userPriKey 258 # self.clnt.signatureHandler.signingCertChain = () 259 # self.clnt.signatureHandler.signingCert = self.userX509Cert 260 # 261 # # Request an attribute certificate from an Attribute Authority 262 # # using the userX509Cert returned from connect() 263 # 264 # aaURI = self.cfg['test07GetAttCertWithUserX509Cert']['aaURI'] 265 # attCert = self.clnt.getAttCert(attAuthorityURI=aaURI) 266 # 267 # print("Attribute Certificate:\n%s" % attCert) 268 # 269 # 270 # def test08GetAttCertFromLocalAttributeAuthority(self): 271 # """test08GetAttCertFromLocalAttributeAuthority: query the Attribute 272 # Authority running in the same server instance as the Session Manager""" 273 # 274 # print "\n\t" + self.test08GetAttCertFromLocalAttributeAuthority.__doc__ 275 # self.test01Connect() 276 # 277 # attCert = self.clnt.getAttCert(sessID=self.sessID) 278 # 279 # print "Attribute Certificate:\n%s" % attCert 280 280 281 281 … … 286 286 # Make a client connection to the WSGI app - authenticate with WSGI 287 287 # basic auth 288 thisSection = 'test09WSGILocalInstanceConnect'289 url = self.cfg[thisSection]['url']290 username = self.cfg[thisSection]['username']291 password = self.cfg[thisSection]['passphrase']288 thisSection = self.cfg['test09WSGILocalInstanceConnect'] 289 url = thisSection['url'] 290 username = thisSection['username'] 291 password = thisSection['passphrase'] 292 292 print("WSGI app connecting to local Session Manager instance: %s" % 293 293 self._httpBasicAuthReq(url, username, password)) 294 295 294 295 296 def test10WSGILocalInstanceGetSessionStatus(self): 297 """test10WSGILocalInstanceGetSessionStatus: test a WSGI app calling a 298 Session Manager WSGI local instance""" 299 300 # Make a client connection to the WSGI app - authenticate with WSGI 301 # basic auth 302 thisSection = self.cfg['test10WSGILocalInstanceGetSessionStatus'] 303 url = thisSection['url'] 304 username = thisSection['username'] 305 password = thisSection['passphrase'] 306 print("WSGI app connecting to local Session Manager instance: %s" % 307 self._httpBasicAuthReq(url, username, password)) 308 309 310 def test11WSGILocalInstanceDisconnect(self): 311 """test11WSGILocalInstanceDisconnect: test a WSGI app calling a 312 Session Manager WSGI local instance""" 313 314 # Make a client connection to the WSGI app - authenticate with WSGI 315 # basic auth 316 thisSection = self.cfg['test11WSGILocalInstanceDisconnect'] 317 url = thisSection['url'] 318 username = thisSection['username'] 319 password = thisSection['passphrase'] 320 print("WSGI app connecting to local Session Manager instance: %s" % 321 self._httpBasicAuthReq(url, username, password)) 322 323 324 def test12WSGILocalInstanceGetAttCert(self): 325 """test12WSGILocalInstanceGetAttCert: test a WSGI app calling a 326 Session Manager WSGI local instance""" 327 328 # Make a client connection to the WSGI app - authenticate with WSGI 329 # basic auth 330 thisSection = self.cfg['test12WSGILocalInstanceGetAttCert'] 331 url = thisSection['url'] 332 username = thisSection['username'] 333 password = thisSection['passphrase'] 334 print("WSGI app connecting to local Session Manager instance: %s" % 335 self._httpBasicAuthReq(url, username, password)) 336 296 337 class CombinedServicesTestSuite(unittest.TestSuite): 297 338 … … 308 349 "test08GetAttCertFromLocalAttributeAuthority", 309 350 "test09WSGILocalInstanceConnect", 351 "test10WSGILocalInstanceGetSessionStatus", 352 "test11WSGILocalInstanceDisconnect" 310 353 )) 311 354 unittest.TestSuite.__init__(self, map)
Note: See TracChangeset
for help on using the changeset viewer.