source: TI12-security/trunk/python/ndg.security.server/ndg/security/server/zsi/attributeauthority.py @ 4245

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.server/ndg/security/server/zsi/attributeauthority.py@4245
Revision 4245, 7.2 KB checked in by pjkersha, 12 years ago (diff)

Working unit tests for WSGI based Attribute Authority.

  • Altered so that all Attribute Config is picked up from the Paste ini file. Separate cfg or xml based config file is still supported.

TODO:

  • Simplify unit test config for client.
Line 
1"""ZSI Server side SOAP Binding for Attribute Authority Web Service
2
3NERC Data Grid Project"""
4__author__ = "P J Kershaw"
5__date__ = "11/06/08"
6__copyright__ = "(C) 2008 STFC & NERC"
7__license__ = \
8"""This software may be distributed under the terms of the Q Public
9License, version 1.0 or later."""
10__contact__ = "P.J.Kershaw@rl.ac.uk"
11__revision__ = '$Id$'
12import os, sys
13import base64
14import logging
15log = logging.getLogger(__name__)
16
17
18from ndg.security.server.AttAuthority.AttAuthority_services_server import \
19        AttAuthorityService as _AttAuthorityService
20
21from ndg.security.server.AttAuthority import AttAuthority, \
22        AttAuthorityAccessDenied
23       
24from ndg.security.common.wssecurity.dom import SignatureHandler
25from ndg.security.common.X509 import X509Cert, X509CertRead
26
27
28class AttributeAuthorityWS(_AttAuthorityService):
29
30    def __init__(self, **kw):
31       
32        # Stop in debugger at beginning of SOAP stub if environment variable
33        # is set
34        self.__debug = bool(os.environ.get('NDGSEC_INT_DEBUG'))
35        if self.__debug:
36                import pdb
37                pdb.set_trace()
38         
39        # Initialise Attribute Authority class - property file will be
40        # picked up from default location under $NDG_DIR directory
41        self.aa = AttAuthority(**kw)
42
43
44    def soap_getAttCert(self, ps, **kw):
45        '''Retrieve an Attribute Certificate
46       
47        @type ps: ZSI ParsedSoap
48        @param ps: client SOAP message
49        @rtype: tuple
50        @return: request and response objects'''
51        if self.__debug:
52                import pdb
53                pdb.set_trace()
54               
55        request, response = _AttAuthorityService.soap_getAttCert(self, ps)
56
57        # Derive designated holder cert differently according to whether
58        # a signed message is expected from the client - NB, this is dependent
59        # on whether a reference to the signature filter was set in the
60        # environment
61        signatureFilter = \
62            self.referencedWSGIFilters.get('wsseSignatureVerificationFilter01')
63        if signatureFilter is not None:
64            # Get certificate corresponding to private key that signed the
65            # message - i.e. the user's proxy
66            holderCert = signatureFilter.signatureHandler.verifyingCert
67        else:
68            # No signature from client - they must instead provide the
69            # designated holder cert via the UserCert input
70            holderCert = request.UserCert
71
72        try:   
73                attCert = self.aa.getAttCert(userId=request.UserId,
74                                         holderCert=holderCert,
75                                         userAttCert=request.UserAttCert) 
76                response.AttCert = attCert.toString()
77               
78        except AttAuthorityAccessDenied, e:
79            response.Msg = str(e)
80                       
81        return request, response
82       
83
84    def soap_getHostInfo(self, ps, **kw):
85        '''Get information about this host
86               
87        @type ps: ZSI ParsedSoap
88        @param ps: client SOAP message
89        @rtype: tuple
90        @return: request and response objects'''
91        if self.__debug:
92                import pdb
93                pdb.set_trace()
94               
95        request, response = _AttAuthorityService.soap_getHostInfo(self, ps)
96       
97        response.Hostname = self.aa.hostInfo.keys()[0]
98        response.AaURI = self.aa.hostInfo[response.Hostname]['aaURI']
99        response.AaDN = self.aa.hostInfo[response.Hostname]['aaDN']
100        response.LoginURI = self.aa.hostInfo[response.Hostname]['loginURI']
101        response.LoginServerDN = \
102                self.aa.hostInfo[response.Hostname]['loginServerDN']
103        response.LoginRequestServerDN = \
104                self.aa.hostInfo[response.Hostname]['loginRequestServerDN']
105
106        return request, response
107       
108
109    def soap_getAllHostsInfo(self, ps, **kw):
110        '''Get information about all hosts
111               
112        @type ps: ZSI ParsedSoap
113        @param ps: client SOAP message
114        @rtype: tuple
115        @return: request and response objects'''
116        if self.__debug:
117                import pdb
118                pdb.set_trace()
119               
120        request, response = _AttAuthorityService.soap_getAllHostsInfo(self, ps)
121       
122
123        trustedHostInfo = self.aa.getTrustedHostInfo()
124
125                # Convert ready for serialization
126               
127                # First get info for THIS Attribute Authority ...
128                # Nb. No role lsit applies here
129        hosts = [response.new_hosts()]
130       
131        hosts[0].Hostname = self.aa.hostInfo.keys()[0]
132       
133        hosts[0].AaURI = \
134                self.aa.hostInfo[hosts[0].Hostname]['aaURI']
135        hosts[0].AaDN = \
136                self.aa.hostInfo[hosts[0].Hostname]['aaDN']
137
138        hosts[0].LoginURI = self.aa.hostInfo[hosts[0].Hostname]['loginURI']
139        hosts[0].LoginServerDN = \
140                self.aa.hostInfo[hosts[0].Hostname]['loginServerDN']
141        hosts[0].LoginRequestServerDN = \
142                self.aa.hostInfo[hosts[0].Hostname]['loginRequestServerDN']
143       
144                # ... then append info for other trusted attribute authorities...
145        for hostname, hostInfo in trustedHostInfo.items():
146            host = response.new_hosts()
147                       
148            host.Hostname = hostname
149            host.AaURI = hostInfo['aaURI']
150            host.AaDN = hostInfo['aaDN']
151            host.LoginURI = hostInfo['loginURI']
152            host.LoginServerDN = hostInfo['loginServerDN']
153            host.LoginRequestServerDN=hostInfo['loginRequestServerDN']
154            host.RoleList = hostInfo['role']
155                       
156            hosts.append(host)
157                       
158        response.Hosts = hosts
159
160        return request, response
161
162
163    def soap_getTrustedHostInfo(self, ps, **kw):
164        '''Get information about other trusted hosts
165               
166        @type ps: ZSI ParsedSoap
167        @param ps: client SOAP message
168        @rtype: tuple
169        @return: request and response objects'''
170        if self.__debug:
171                import pdb
172                pdb.set_trace()
173               
174        request, response = \
175                        _AttAuthorityService.soap_getTrustedHostInfo(self, ps)
176       
177        trustedHostInfo = self.aa.getTrustedHostInfo(role=request.Role)
178
179                # Convert ready for serialization
180        trustedHosts = []
181        for hostname, hostInfo in trustedHostInfo.items():
182            trustedHost = response.new_trustedHosts()
183                       
184            trustedHost.Hostname = hostname
185            trustedHost.AaURI = hostInfo['aaURI']
186            trustedHost.AaDN = hostInfo['aaDN']
187            trustedHost.LoginURI = hostInfo['loginURI']
188            trustedHost.LoginServerDN = hostInfo['loginServerDN']
189            trustedHost.LoginRequestServerDN=hostInfo['loginRequestServerDN']
190            trustedHost.RoleList = hostInfo['role']
191                       
192            trustedHosts.append(trustedHost)
193                       
194        response.TrustedHosts = trustedHosts
195               
196        return request, response
197
198
199    def soap_getX509Cert(self, ps, **kw):
200        '''Retrieve Attribute Authority's X.509 certificate
201       
202        @type ps: ZSI ParsedSoap
203        @param ps: client SOAP message
204        @rtype: tuple
205        @return: request and response objects'''
206        if self.__debug:
207                import pdb
208                pdb.set_trace()
209               
210        request, response = _AttAuthorityService.soap_getX509Cert(self, ps)
211       
212        x509Cert = X509CertRead(self.aa['signingCertFilePath'])
213        response.X509Cert = base64.encodestring(x509Cert.asDER())
214        return request, response
Note: See TracBrowser for help on using the repository browser.