source: TI12-security/trunk/python/Tests/pylonsAttributeAuthority/ndgsecurity/ndgsecurity/config/attributeauthority.py @ 4129

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/pylonsAttributeAuthority/ndgsecurity/ndgsecurity/config/attributeauthority.py@4129
Revision 4129, 6.6 KB checked in by cbyrom, 11 years ago (diff)

General refactoring and updating of code, including:

Removal of refC14nKw and singnedInfoC14nKw keywords in wsssecurity session manager config
(the refC14nInclNS and signedInfoC14nInclNS keywords are sufficient);
Creation of new DOM signature handler class, dom.py, based on the wsSecurity
class;
Abstraction of common code between dom.py and etree.py into new parent
class, BaseSignatureHandler?.py.
Fixing and extending use of properties in the SignatureHandler? code.
Fixing a few bugs with the original SignatureHandler? code.
Updating of test cases to new code/code structure.

Line 
1import os, sys
2import base64
3import logging
4log = logging.getLogger(__name__)
5
6
7from ndg.security.server.AttAuthority.AttAuthority_services_server import \
8        AttAuthorityService as _AttAuthorityService
9
10from ndg.security.server.AttAuthority import AttAuthority, \
11        AttAuthorityAccessDenied
12       
13from ndg.security.common.X509 import X509Cert, X509CertRead
14
15from ndgsecurity.config.soap import SOAPMiddleware
16
17
18class AttributeAuthorityWS(_AttAuthorityService):
19
20    def __init__(self):
21       
22        # Stop in debugger at beginning of SOAP stub if environment variable
23        # is set
24        self.__debug = bool(os.environ.get('NDGSEC_INT_DEBUG'))
25        if self.__debug:
26                import pdb
27                pdb.set_trace()
28         
29        # Initialize Attribute Authority class - property file will be
30        # picked up from default location under $NDG_DIR directory
31        self.aa = AttAuthority()
32
33
34    def soap_getAttCert(self, ps, **kw):
35        '''Retrieve an Attribute Certificate
36       
37        @type ps: ZSI ParsedSoap
38        @param ps: client SOAP message
39        @rtype: tuple
40        @return: request and response objects'''
41        if self.__debug:
42                import pdb
43                pdb.set_trace()
44               
45        request, response = _AttAuthorityService.soap_getAttCert(self, ps)
46
47        # Derive designated holder cert differently according to whether
48        # a signed message is expected from the client
49        if self.aa['useSignatureHandler']:
50            # Get certificate corresponding to private key that signed the
51            # message - i.e. the user's proxy
52            holderCert = WSSecurityHandler.signatureHandler.verifyingCert
53        else:
54            # No signature from client - they must instead provide the
55            # designated holder cert via the UserCert input
56            holderCert = request.UserCert
57
58        try:   
59                attCert = self.aa.getAttCert(userId=request.UserId,
60                                         holderCert=holderCert,
61                                         userAttCert=request.UserAttCert) 
62                response.AttCert = attCert.toString()
63               
64        except AttAuthorityAccessDenied, e:
65            response.Msg = str(e)
66                       
67        return request, response
68       
69
70    def soap_getHostInfo(self, ps, **kw):
71        '''Get information about this host
72               
73        @type ps: ZSI ParsedSoap
74        @param ps: client SOAP message
75        @rtype: tuple
76        @return: request and response objects'''
77        if self.__debug:
78                import pdb
79                pdb.set_trace()
80               
81        request, response = _AttAuthorityService.soap_getHostInfo(self, ps)
82       
83        response.Hostname = self.aa.hostInfo.keys()[0]
84        response.AaURI = self.aa.hostInfo[response.Hostname]['aaURI']
85        response.AaDN = self.aa.hostInfo[response.Hostname]['aaDN']
86        response.LoginURI = self.aa.hostInfo[response.Hostname]['loginURI']
87        response.LoginServerDN = \
88                self.aa.hostInfo[response.Hostname]['loginServerDN']
89        response.LoginRequestServerDN = \
90                self.aa.hostInfo[response.Hostname]['loginRequestServerDN']
91
92        return request, response
93       
94
95    def soap_getAllHostsInfo(self, ps, **kw):
96        '''Get information about all hosts
97               
98        @type ps: ZSI ParsedSoap
99        @param ps: client SOAP message
100        @rtype: tuple
101        @return: request and response objects'''
102        if self.__debug:
103                import pdb
104                pdb.set_trace()
105               
106        request, response = _AttAuthorityService.soap_getAllHostsInfo(self, ps)
107       
108
109        trustedHostInfo = self.aa.getTrustedHostInfo()
110
111                # Convert ready for serialization
112               
113                # First get info for THIS Attribute Authority ...
114                # Nb. No role lsit applies here
115        hosts = [response.new_hosts()]
116       
117        hosts[0].Hostname = self.aa.hostInfo.keys()[0]
118       
119        hosts[0].AaURI = \
120                self.aa.hostInfo[hosts[0].Hostname]['aaURI']
121        hosts[0].AaDN = \
122                self.aa.hostInfo[hosts[0].Hostname]['aaDN']
123
124        hosts[0].LoginURI = self.aa.hostInfo[hosts[0].Hostname]['loginURI']
125        hosts[0].LoginServerDN = \
126                self.aa.hostInfo[hosts[0].Hostname]['loginServerDN']
127        hosts[0].LoginRequestServerDN = \
128                self.aa.hostInfo[hosts[0].Hostname]['loginRequestServerDN']
129       
130                # ... then append info for other trusted attribute authorities...
131        for hostname, hostInfo in trustedHostInfo.items():
132            host = response.new_hosts()
133                       
134            host.Hostname = hostname
135            host.AaURI = hostInfo['aaURI']
136            host.AaDN = hostInfo['aaDN']
137            host.LoginURI = hostInfo['loginURI']
138            host.LoginServerDN = hostInfo['loginServerDN']
139            host.LoginRequestServerDN=hostInfo['loginRequestServerDN']
140            host.RoleList = hostInfo['role']
141                       
142            hosts.append(host)
143                       
144        response.Hosts = hosts
145
146        return request, response
147
148
149    def soap_getTrustedHostInfo(self, ps, **kw):
150        '''Get information about other trusted hosts
151               
152        @type ps: ZSI ParsedSoap
153        @param ps: client SOAP message
154        @rtype: tuple
155        @return: request and response objects'''
156        if self.__debug:
157                import pdb
158                pdb.set_trace()
159               
160        request, response = \
161                        _AttAuthorityService.soap_getTrustedHostInfo(self, ps)
162       
163        trustedHostInfo = self.aa.getTrustedHostInfo(role=request.Role)
164
165                # Convert ready for serialization
166        trustedHosts = []
167        for hostname, hostInfo in trustedHostInfo.items():
168            trustedHost = response.new_trustedHosts()
169                       
170            trustedHost.Hostname = hostname
171            trustedHost.AaURI = hostInfo['aaURI']
172            trustedHost.AaDN = hostInfo['aaDN']
173            trustedHost.LoginURI = hostInfo['loginURI']
174            trustedHost.LoginServerDN = hostInfo['loginServerDN']
175            trustedHost.LoginRequestServerDN=hostInfo['loginRequestServerDN']
176            trustedHost.RoleList = hostInfo['role']
177                       
178            trustedHosts.append(trustedHost)
179                       
180        response.TrustedHosts = trustedHosts
181               
182        return request, response
183
184
185    def soap_getX509Cert(self, ps, **kw):
186        '''Retrieve Attribute Authority's X.509 certificate
187       
188        @type ps: ZSI ParsedSoap
189        @param ps: client SOAP message
190        @rtype: tuple
191        @return: request and response objects'''
192        if self.__debug:
193                import pdb
194                pdb.set_trace()
195               
196        request, response = _AttAuthorityService.soap_getX509Cert(self, ps)
197       
198        x509Cert = X509CertRead(self.aa['certFile'])
199        response.X509Cert = base64.encodestring(x509Cert.asDER())
200        return request, response
Note: See TracBrowser for help on using the repository browser.