source: TI12-security/trunk/python/Tests/pylonsAttributeAuthority/ndgsecurity/ndgsecurity/config/attributeauthority.py @ 4020

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/pylonsAttributeAuthority/ndgsecurity/ndgsecurity/config/attributeauthority.py@4020
Revision 4020, 6.7 KB checked in by pjkersha, 11 years ago (diff)

More work on WSGI SOAPMiddleware

  • Moved soap and wssecurity middleware modules to ndg.security.server.wsgi
  • ... and ZSI Attribute Service Binding to ndg.security.server.zsi
  • test harness - pylons ndgsecurity in Test/ now works with Paste pipeline in ini to enable dynamic config for WS-Security handlers
Line 
1import os, sys
2import base64
3import logging
4log = logging.getLogger(__name__)
5
6
7from ndg.security.server.AttAuthority.AttAuthority_services_server import \
8        AttAuthorityService as _AttAuthorityService
9
10from ndg.security.server.AttAuthority import AttAuthority, \
11        AttAuthorityAccessDenied
12       
13from ndg.security.common.wsSecurity import SignatureHandler
14from ndg.security.common.X509 import X509Cert, X509CertRead
15
16from ndgsecurity.config.soap import SOAPMiddleware
17
18
19class AttributeAuthorityWS(_AttAuthorityService):
20
21    def __init__(self):
22       
23        # Stop in debugger at beginning of SOAP stub if environment variable
24        # is set
25        self.__debug = bool(os.environ.get('NDGSEC_INT_DEBUG'))
26        if self.__debug:
27                import pdb
28                pdb.set_trace()
29         
30        # Initialize Attribute Authority class - property file will be
31        # picked up from default location under $NDG_DIR directory
32        self.aa = AttAuthority()
33
34
35    def soap_getAttCert(self, ps, **kw):
36        '''Retrieve an Attribute Certificate
37       
38        @type ps: ZSI ParsedSoap
39        @param ps: client SOAP message
40        @rtype: tuple
41        @return: request and response objects'''
42        if self.__debug:
43                import pdb
44                pdb.set_trace()
45               
46        request, response = _AttAuthorityService.soap_getAttCert(self, ps)
47
48        # Derive designated holder cert differently according to whether
49        # a signed message is expected from the client
50        if self.aa['useSignatureHandler']:
51            # Get certificate corresponding to private key that signed the
52            # message - i.e. the user's proxy
53            holderCert = WSSecurityHandler.signatureHandler.verifyingCert
54        else:
55            # No signature from client - they must instead provide the
56            # designated holder cert via the UserCert input
57            holderCert = request.UserCert
58
59        try:   
60                attCert = self.aa.getAttCert(userId=request.UserId,
61                                         holderCert=holderCert,
62                                         userAttCert=request.UserAttCert) 
63                response.AttCert = attCert.toString()
64               
65        except AttAuthorityAccessDenied, e:
66            response.Msg = str(e)
67                       
68        return request, response
69       
70
71    def soap_getHostInfo(self, ps, **kw):
72        '''Get information about this host
73               
74        @type ps: ZSI ParsedSoap
75        @param ps: client SOAP message
76        @rtype: tuple
77        @return: request and response objects'''
78        if self.__debug:
79                import pdb
80                pdb.set_trace()
81               
82        request, response = _AttAuthorityService.soap_getHostInfo(self, ps)
83       
84        response.Hostname = self.aa.hostInfo.keys()[0]
85        response.AaURI = self.aa.hostInfo[response.Hostname]['aaURI']
86        response.AaDN = self.aa.hostInfo[response.Hostname]['aaDN']
87        response.LoginURI = self.aa.hostInfo[response.Hostname]['loginURI']
88        response.LoginServerDN = \
89                self.aa.hostInfo[response.Hostname]['loginServerDN']
90        response.LoginRequestServerDN = \
91                self.aa.hostInfo[response.Hostname]['loginRequestServerDN']
92
93        return request, response
94       
95
96    def soap_getAllHostsInfo(self, ps, **kw):
97        '''Get information about all hosts
98               
99        @type ps: ZSI ParsedSoap
100        @param ps: client SOAP message
101        @rtype: tuple
102        @return: request and response objects'''
103        if self.__debug:
104                import pdb
105                pdb.set_trace()
106               
107        request, response = _AttAuthorityService.soap_getAllHostsInfo(self, ps)
108       
109
110        trustedHostInfo = self.aa.getTrustedHostInfo()
111
112                # Convert ready for serialization
113               
114                # First get info for THIS Attribute Authority ...
115                # Nb. No role lsit applies here
116        hosts = [response.new_hosts()]
117       
118        hosts[0].Hostname = self.aa.hostInfo.keys()[0]
119       
120        hosts[0].AaURI = \
121                self.aa.hostInfo[hosts[0].Hostname]['aaURI']
122        hosts[0].AaDN = \
123                self.aa.hostInfo[hosts[0].Hostname]['aaDN']
124
125        hosts[0].LoginURI = self.aa.hostInfo[hosts[0].Hostname]['loginURI']
126        hosts[0].LoginServerDN = \
127                self.aa.hostInfo[hosts[0].Hostname]['loginServerDN']
128        hosts[0].LoginRequestServerDN = \
129                self.aa.hostInfo[hosts[0].Hostname]['loginRequestServerDN']
130       
131                # ... then append info for other trusted attribute authorities...
132        for hostname, hostInfo in trustedHostInfo.items():
133            host = response.new_hosts()
134                       
135            host.Hostname = hostname
136            host.AaURI = hostInfo['aaURI']
137            host.AaDN = hostInfo['aaDN']
138            host.LoginURI = hostInfo['loginURI']
139            host.LoginServerDN = hostInfo['loginServerDN']
140            host.LoginRequestServerDN=hostInfo['loginRequestServerDN']
141            host.RoleList = hostInfo['role']
142                       
143            hosts.append(host)
144                       
145        response.Hosts = hosts
146
147        return request, response
148
149
150    def soap_getTrustedHostInfo(self, ps, **kw):
151        '''Get information about other trusted hosts
152               
153        @type ps: ZSI ParsedSoap
154        @param ps: client SOAP message
155        @rtype: tuple
156        @return: request and response objects'''
157        if self.__debug:
158                import pdb
159                pdb.set_trace()
160               
161        request, response = \
162                        _AttAuthorityService.soap_getTrustedHostInfo(self, ps)
163       
164        trustedHostInfo = self.aa.getTrustedHostInfo(role=request.Role)
165
166                # Convert ready for serialization
167        trustedHosts = []
168        for hostname, hostInfo in trustedHostInfo.items():
169            trustedHost = response.new_trustedHosts()
170                       
171            trustedHost.Hostname = hostname
172            trustedHost.AaURI = hostInfo['aaURI']
173            trustedHost.AaDN = hostInfo['aaDN']
174            trustedHost.LoginURI = hostInfo['loginURI']
175            trustedHost.LoginServerDN = hostInfo['loginServerDN']
176            trustedHost.LoginRequestServerDN=hostInfo['loginRequestServerDN']
177            trustedHost.RoleList = hostInfo['role']
178                       
179            trustedHosts.append(trustedHost)
180                       
181        response.TrustedHosts = trustedHosts
182               
183        return request, response
184
185
186    def soap_getX509Cert(self, ps, **kw):
187        '''Retrieve Attribute Authority's X.509 certificate
188       
189        @type ps: ZSI ParsedSoap
190        @param ps: client SOAP message
191        @rtype: tuple
192        @return: request and response objects'''
193        if self.__debug:
194                import pdb
195                pdb.set_trace()
196               
197        request, response = _AttAuthorityService.soap_getX509Cert(self, ps)
198       
199        x509Cert = X509CertRead(self.aa['certFile'])
200        response.X509Cert = base64.encodestring(x509Cert.asDER())
201        return request, response
Note: See TracBrowser for help on using the repository browser.