source: TI12-security/trunk/python/ndg.security.server/ndg/security/server/zsi/attributeauthority/__init__.py @ 4392

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.server/ndg/security/server/zsi/attributeauthority/__init__.py@4392
Revision 4392, 7.9 KB checked in by pjkersha, 11 years ago (diff)

Rerun Credential Wallet and AA client unittests with refactored WSDL stubs in place.

Line 
1"""ZSI Server side SOAP Binding for Attribute Authority Web Service
2
3NERC Data Grid Project"""
4__author__ = "P J Kershaw"
5__date__ = "11/06/08"
6__copyright__ = "(C) 2008 STFC & NERC"
7__license__ = \
8"""This software may be distributed under the terms of the Q Public
9License, version 1.0 or later."""
10__contact__ = "P.J.Kershaw@rl.ac.uk"
11__revision__ = '$Id$'
12import os
13import sys
14import base64
15import logging
16log = logging.getLogger(__name__)
17
18from ndg.security.common.zsi.attributeauthority.AttributeAuthority_services import \
19    getAttCertInputMsg, getAttCertOutputMsg, \
20    getHostInfoInputMsg, getHostInfoOutputMsg, \
21    getTrustedHostInfoInputMsg, getTrustedHostInfoOutputMsg, \
22    getAllHostsInfoInputMsg, getAllHostsInfoOutputMsg, \
23    getX509CertInputMsg, getX509CertOutputMsg
24   
25from ndg.security.server.zsi.attributeauthority.AttributeAuthority_services_server \
26    import AttributeAuthorityService as _AttributeAuthorityService
27
28from ndg.security.server.attributeauthority import AttributeAuthority, \
29    AttributeAuthorityAccessDenied
30   
31from ndg.security.common.wssecurity.dom import SignatureHandler
32from ndg.security.common.X509 import X509Cert, X509CertRead
33
34
35class AttributeAuthorityWS(_AttributeAuthorityService):
36    '''Attribute Authority ZSI SOAP Service Binding class'''
37
38    def __init__(self, **kw):
39       
40        # Stop in debugger at beginning of SOAP stub if environment variable
41        # is set
42        self.__debug = bool(os.environ.get('NDGSEC_INT_DEBUG'))
43        if self.__debug:
44            import pdb
45            pdb.set_trace()
46         
47        # Initialise Attribute Authority class - property file will be
48        # picked up from default location under $NDG_DIR directory
49        self.aa = AttributeAuthority(**kw)
50
51
52    def soap_getAttCert(self, ps, **kw):
53        '''Retrieve an Attribute Certificate
54       
55        @type ps: ZSI ParsedSoap
56        @param ps: client SOAP message
57        @rtype: ndg.security.common.zsi.attributeauthority.AttributeAuthority_services_types.getAttCertResponse_Holder
58        @return: response'''
59        if self.__debug:
60            import pdb
61            pdb.set_trace()
62       
63        request = ps.Parse(getAttCertInputMsg.typecode)   
64        response = _AttributeAuthorityService.soap_getAttCert(self, ps)
65
66        # Derive designated holder cert differently according to whether
67        # a signed message is expected from the client - NB, this is dependent
68        # on whether a reference to the signature filter was set in the
69        # environment
70        signatureFilter = \
71            self.referencedWSGIFilters.get('wsseSignatureVerificationFilter01')
72        if signatureFilter is not None:
73            # Get certificate corresponding to private key that signed the
74            # message - i.e. the user's proxy
75            holderCert = signatureFilter.signatureHandler.verifyingCert
76        else:
77            # No signature from client - they must instead provide the
78            # designated holder cert via the UserCert input
79            holderCert = request.UserCert
80
81        try:
82            attCert = self.aa.getAttCert(userId=request.UserId,
83                                         holderCert=holderCert,
84                                         userAttCert=request.UserAttCert) 
85            response.AttCert = attCert.toString()
86           
87        except AttributeAuthorityAccessDenied, e:
88            response.Msg = str(e)
89           
90        return response
91       
92
93    def soap_getHostInfo(self, ps, **kw):
94        '''Get information about this host
95               
96        @type ps: ZSI ParsedSoap
97        @param ps: client SOAP message
98        @rtype: response
99        @return: response'''
100        if self.__debug:
101            import pdb
102            pdb.set_trace()
103           
104        response = _AttributeAuthorityService.soap_getHostInfo(self, ps)
105       
106        response.Hostname = self.aa.hostInfo.keys()[0]
107        response.AaURI = self.aa.hostInfo[response.Hostname]['aaURI']
108        response.AaDN = self.aa.hostInfo[response.Hostname]['aaDN']
109        response.LoginURI = self.aa.hostInfo[response.Hostname]['loginURI']
110        response.LoginServerDN = \
111            self.aa.hostInfo[response.Hostname]['loginServerDN']
112        response.LoginRequestServerDN = \
113            self.aa.hostInfo[response.Hostname]['loginRequestServerDN']
114
115        return response
116       
117
118    def soap_getAllHostsInfo(self, ps, **kw):
119        '''Get information about all hosts
120               
121        @type ps: ZSI ParsedSoap
122        @param ps: client SOAP message
123        @rtype: tuple
124        @return: response object'''
125        if self.__debug:
126            import pdb
127            pdb.set_trace()
128           
129        response = _AttributeAuthorityService.soap_getAllHostsInfo(self, ps)
130       
131
132        trustedHostInfo = self.aa.getTrustedHostInfo()
133
134        # Convert ready for serialization
135       
136        # First get info for THIS Attribute Authority ...
137        # Nb. No role lsit applies here
138        hosts = [response.new_hosts()]
139       
140        hosts[0].Hostname = self.aa.hostInfo.keys()[0]
141       
142        hosts[0].AaURI = \
143            self.aa.hostInfo[hosts[0].Hostname]['aaURI']
144        hosts[0].AaDN = \
145            self.aa.hostInfo[hosts[0].Hostname]['aaDN']
146
147        hosts[0].LoginURI = self.aa.hostInfo[hosts[0].Hostname]['loginURI']
148        hosts[0].LoginServerDN = \
149            self.aa.hostInfo[hosts[0].Hostname]['loginServerDN']
150        hosts[0].LoginRequestServerDN = \
151            self.aa.hostInfo[hosts[0].Hostname]['loginRequestServerDN']
152       
153        # ... then append info for other trusted attribute authorities...
154        for hostname, hostInfo in trustedHostInfo.items():
155            host = response.new_hosts()
156           
157            host.Hostname = hostname
158            host.AaURI = hostInfo['aaURI']
159            host.AaDN = hostInfo['aaDN']
160            host.LoginURI = hostInfo['loginURI']
161            host.LoginServerDN = hostInfo['loginServerDN']
162            host.LoginRequestServerDN=hostInfo['loginRequestServerDN']
163            host.RoleList = hostInfo['role']
164           
165            hosts.append(host)
166           
167        response.Hosts = hosts
168
169        return response
170
171
172    def soap_getTrustedHostInfo(self, ps, **kw):
173        '''Get information about other trusted hosts
174               
175        @type ps: ZSI ParsedSoap
176        @param ps: client SOAP message
177        @rtype: tuple
178        @return: response object'''
179        if self.__debug:
180            import pdb
181            pdb.set_trace()
182           
183        request = ps.Parse(getTrustedHostInfoInputMsg.typecode)   
184        response = _AttributeAuthorityService.soap_getTrustedHostInfo(self, ps)
185       
186        trustedHostInfo = self.aa.getTrustedHostInfo(role=request.Role)
187
188        # Convert ready for serialization
189        trustedHosts = []
190        for hostname, hostInfo in trustedHostInfo.items():
191            trustedHost = response.new_trustedHosts()
192           
193            trustedHost.Hostname = hostname
194            trustedHost.AaURI = hostInfo['aaURI']
195            trustedHost.AaDN = hostInfo['aaDN']
196            trustedHost.LoginURI = hostInfo['loginURI']
197            trustedHost.LoginServerDN = hostInfo['loginServerDN']
198            trustedHost.LoginRequestServerDN=hostInfo['loginRequestServerDN']
199            trustedHost.RoleList = hostInfo['role']
200           
201            trustedHosts.append(trustedHost)
202           
203        response.TrustedHosts = trustedHosts
204       
205        return response
206
207
208    def soap_getX509Cert(self, ps, **kw):
209        '''Retrieve Attribute Authority's X.509 certificate
210       
211        @type ps: ZSI ParsedSoap
212        @param ps: client SOAP message
213        @rtype: tuple
214        @return: response object'''
215        if self.__debug:
216            import pdb
217            pdb.set_trace()
218           
219        response = _AttributeAuthorityService.soap_getX509Cert(self, ps)
220       
221        x509Cert = X509CertRead(self.aa['signingCertFilePath'])
222        response.X509Cert = base64.encodestring(x509Cert.asDER())
223        return response
Note: See TracBrowser for help on using the repository browser.