source: TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py @ 1465

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py@1465
Revision 1465, 15.0 KB checked in by pjkersha, 13 years ago (diff)

Added sign and verify to wsServer but problems with canonicalization causing verify error. This seems to be
due to attribute order differences. These shouldn't happen given the same canonicalization method for
signature and verification but the could be a problem with the way that the DOM code creates the nodes in
sign() ?

sing was added to server side code by overriding SOAPRequestHandler.do_POST and making a custom _Dispatch
function.

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3import re
4
5# Digest and signature/verify
6from sha import sha
7from M2Crypto import X509, BIO, RSA
8import base64
9
10import cElementTree as ElementTree
11import ZSI
12from ZSI.wstools.Namespaces import DSIG, OASIS, GLOBUS, WSU, WSA200403, BEA, \
13                                   SOAP
14                                   
15from ZSI.TC import ElementDeclaration,TypeDefinition
16from ZSI.generate.pyclass import pyclass_type
17
18from ZSI.wstools.Utility import DOMException, SplitQName
19from ZSI.wstools.Utility import NamespaceError, MessageInterface, ElementProxy
20
21# XML Parsing
22from cStringIO import StringIO
23from Ft.Xml.Domlette import NonvalidatingReaderBase, NonvalidatingReader
24from Ft.Xml import XPath
25
26# Canonicalization
27from ZSI.wstools.c14n import Canonicalize
28from xml.dom import Node
29from xml.xpath.Context import Context
30from xml import xpath
31
32# Type codes for signature elements
33#from ZSI.TC import _get_global_element_declaration as getGlobalElemDecl
34
35#Security = getGlobalElemDecl(OASIS.WSSE, "Security").pyclass
36#KeyInfo = getGlobalElemDecl(DSIG.BASE, "KeyInfo").pyclass
37#Reference = GED(OASIS.WSSE, "Reference").pyclass
38#SecurityReference = GED(OASIS.WSSE, "SecurityTokenReference").pyclass
39#Signature = GED(DSIG.BASE, "Signature").pyclass
40
41
42
43
44class SignatureHandler(object):
45   
46    # Unique token reference
47    tokenRef = 10101
48   
49    def __init__(self,
50                 certFilePath=None, 
51                 priKeyFilePath=None, 
52                 priKeyPwd=None, 
53                 sign_body=True, 
54                 sign_headers=True):
55       
56        self.__certFilePath = certFilePath
57        self.__priKeyFilePath = priKeyFilePath
58        self.__priKeyPwd = priKeyPwd
59       
60        self._can_algo = DSIG.C14N
61        self._sig_algo = GLOBUS.SIG
62        self._dig_algo = DSIG.DIGEST_SHA1
63        self._docCtxtID = None
64        self._expire_time = 300
65        self._setProtocol = False
66        self._sign_headers = sign_headers
67        self._sign_body = sign_body
68
69
70    def sign(self, soapWriter):
71        '''Sign the message body and binary security token of a SOAP message
72        '''
73        # Add X.509 cert as binary security token
74        x509Cert = X509.load_cert(self.__certFilePath)
75       
76        x509CertPat = re.compile(\
77            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
78            re.S)
79        x509CertStr = x509CertPat.findall(x509Cert.as_pem())[0]
80
81       
82        # Namespaces for XPath searches
83        soapWriter.dom.processorNss = \
84        {
85            'ds':     DSIG.BASE, 
86            'wsu':    WSU.UTILITY, 
87            'wsse':   OASIS.WSSE, 
88            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
89        }
90
91        soapWriter._header.setNamespaceAttribute('wsse', OASIS.WSSE)
92        soapWriter._header.setNamespaceAttribute('wsu', WSU.UTILITY)
93        soapWriter._header.setNamespaceAttribute('ds', DSIG.BASE)
94       
95        wsseElem = soapWriter._header.createAppendElement(OASIS.WSSE, 
96                                                         'Security')
97        wsseElem.setNamespaceAttribute('wsse', OASIS.WSSE)
98        wsseElem.node.setAttribute('SOAP-ENV:mustUnderstand', "1")
99       
100        binSecTokElem = wsseElem.createAppendElement(OASIS.WSSE, 
101                                                     'BinarySecurityToken')
102        binSecTokElem.node.setAttribute('ValueType', "wsse:X509v3")
103        binSecTokElem.node.setAttribute('EncodingType', "wsse:Base64Binary")
104       
105        # Add ID
106        binSecTokElem.node.setAttribute('wsu:Id', "binaryToken")
107
108        binSecTokElem.createAppendTextNode(x509CertStr)
109
110       
111        signatureElem = wsseElem.createAppendElement(DSIG.BASE, 'Signature')
112        signatureElem.setNamespaceAttribute('ds', DSIG.BASE)
113       
114        signedInfoElem = signatureElem.createAppendElement(DSIG.BASE, 
115                                                           'SignedInfo')
116        signatureValueElem = signatureElem.createAppendElement(DSIG.BASE, 
117                                                             'SignatureValue')
118       
119        KeyInfoElem = signatureElem.createAppendElement(DSIG.BASE, 'KeyInfo')
120        secTokRefElem = KeyInfoElem.createAppendElement(OASIS.WSSE, 
121                                                  'SecurityTokenReference')
122       
123        wsseRefElem = secTokRefElem.createAppendElement(OASIS.WSSE, 
124                                                        'Reference')
125        wsseRefElem.node.setAttribute('URI', "#binaryToken")
126       
127        # Add Reference to body
128        soapWriter.body.node.setAttribute('wsu:Id', "body")
129        soapWriter.body.node.setAttribute('xmlns:wsu', WSU.UTILITY)
130       
131       
132        # 1) Reference Generation
133        #
134        # Find references
135        #idNodes = XPath.Compile('//*[@wsu:Id]').evaluate(docCtxt)
136        idElems = [soapWriter.body]#[binSecTokElem, soapWriter.body]
137        for idElem in idElems:
138           
139            # Set URI attribute to point to reference to be signed
140            uri = u"#" + idElem.node.getAttribute('wsu:Id')
141           
142            # Canonicalize reference
143#            c14nRef1 = idElem.canonicalize()
144            c14nRef = Canonicalize(idElem.node)
145            import pdb;pdb.set_trace()
146#            print "Equal? %s" % (c14nRef == c14nRef1)
147            print 'idElem'
148            print idElem
149            print 'c14nRef'
150            print c14nRef
151           
152            # Calculate digest for reference and base 64 encode
153            #
154            # Nb. encodestring adds a trailing newline char
155            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
156
157
158            # Add a new reference element to SignedInfo
159            refElem = signedInfoElem.createAppendElement(DSIG.BASE, 
160                                                         'Reference')
161            refElem.node.setAttribute('URI', uri)
162           
163            # Use ds:Transforms or wsse:TransformationParameters?
164            tranformsElem = refElem.createAppendElement(DSIG.BASE, 
165                                                        'Transforms')
166            tranformElem = tranformsElem.createAppendElement(DSIG.BASE, 
167                                                             'Transform')
168            tranformElem.node.setAttribute('Algorithm', DSIG.C14N)
169           
170            refElem.createAppendElement(DSIG.BASE, 'DigestMethod')
171            refElem.node.setAttribute('Algorithm', DSIG.DIGEST_SHA1)
172           
173            digestValueElem = refElem.createAppendElement(DSIG.BASE, 
174                                                          'DigestValue')
175            digestValueElem.createAppendTextNode(digestValue)
176
177       
178        # 2) Signature Generation
179        #
180
181        # Test against signature generated by pyXMLSec version
182        #xmlTxt = open('./wsseSign-xmlsec-res.xml').read()
183        #dom = NonvalidatingReader.parseStream(StringIO(xmlTxt))
184       
185        # Canonicalize the signedInfo node
186        #
187        # Nb. When extracted the code adds the namespace attribute to the
188        # signedInfo!  This has important consequences for validation -
189        #
190        # 1) Do you strip the namespace attribute before taking the digest to
191        # ensure the text is exactly the same as what is displayed in the
192        # message?
193        #
194        # 2) Leave it in and assume the validation algorithm will expect to
195        # add in the namespace attribute?!
196        #
197        # http://www.w3.org/TR/xml-c14n#NoNSPrefixRewriting implies you need
198        # to include namespace declarations for namespaces referenced in a doc
199        # subset - yes to 2)
200        c14nSignedInfo = signedInfoElem.canonicalize()
201
202        # Calculate digest of SignedInfo
203        signedInfoDigestValue = sha(c14nSignedInfo).digest().strip()
204       
205        # Read Private key to sign with   
206        priKeyFile = BIO.File(open(self.__priKeyFilePath))                                           
207        priKey = RSA.load_key_bio(priKeyFile, 
208                                  callback=lambda *ar, **kw: self.__priKeyPwd)
209       
210        # Sign using the private key and base 64 encode the result
211        signatureValue = priKey.sign(signedInfoDigestValue)
212        b64EncSignatureValue = base64.encodestring(signatureValue).strip()
213
214        # Add to <SignatureValue>
215        signatureValueElem.createAppendTextNode(b64EncSignatureValue)
216       
217        # Extract RSA public key from the cert
218        rsaPubKey = x509Cert.get_pubkey().get_rsa()
219       
220        # Check the signature
221#        verify = bool(rsaPubKey.verify(signedInfoDigestValue, signatureValue))
222#        print soapWriter.dom.node.toprettyxml()
223#       
224#        from xml.dom.ext.reader.PyExpat import Reader
225#        Reader().fromString(str(soapWriter))
226#        import pdb;pdb.set_trace()       
227
228
229    def verify(self, parsedSOAP):
230        """Verify signature"""
231       
232        element = ElementProxy(None, message=parsedSOAP.header)
233        processorNss = \
234        {
235            'ds':     DSIG.BASE, 
236            'wsu':    WSU.UTILITY, 
237            'wsse':   OASIS.WSSE, 
238            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
239        }
240        ctxt = Context(parsedSOAP.dom, processorNss=processorNss)
241       
242
243        signatureNodes = xpath.Evaluate('//ds:Signature', 
244                                        contextNode=parsedSOAP.dom, 
245                                        context=ctxt)
246        if len(signatureNodes) > 1:
247            raise Exception, 'Multiple ds:Signature elements found'
248       
249        try:
250            signatureNodes = signatureNodes[0]
251        except:
252            # Message wasn't signed
253            return
254       
255        # Two stage process: reference validation followed by signature
256        # validation
257       
258        # 1) Reference Validation       
259        refNodes = xpath.Evaluate('//ds:Reference', 
260                                  contextNode=parsedSOAP.dom, 
261                                  context=ctxt)
262           
263        # Make a lambda to pick out node child or children with a given
264        # name
265        getElements = lambda node, nameList: \
266                [n for n in node.childNodes if str(n.localName) in nameList] 
267       
268        for refNode in refNodes:
269            # Get the URI for the reference
270            refURI = refNode.getAttributeNodeNS(None, 'URI').value
271                           
272            transformsNode = getElements(refNode, "Transforms")[0]
273            transforms = getElements(transformsNode, "Transform")
274   
275            algorithm = transforms[0].getAttributeNodeNS(None, 
276                                                         "Algorithm").value
277           
278            # Add extra keyword for Exclusive canonicalization method
279            kw = {}
280            if algorithm == DSIG.C14N_EXCL:
281                try:
282                    inclusiveNS = transforms[0].getElement(DSIG.C14N_EXCL, 
283                                                       "InclusiveNamespaces")
284                    kw['unsuppressedPrefixes'] = \
285                    inclusiveNS.getAttributeValue(None, "PrefixList").split()
286                except:
287                    raise Exception, \
288                'failed to handle transform (%s) in <ds:Reference URI="%s">'%\
289                        (transforms[0], uri)
290       
291            # Canonicalize the reference data and calculate the digest
292            if refURI[0] != "#":
293                raise Exception, \
294                    "Expecting # identifier for Reference URI \"%s\"" % refURI
295                   
296            # XPath reference
297            uriXPath = '//*[@wsu:Id="%s"]' % refURI[1:]
298            uriNode = xpath.Evaluate(uriXPath, 
299                                     contextNode=parsedSOAP.dom, 
300                                     context=ctxt)[0]
301
302            c14nRef = Canonicalize(uriNode, **kw)
303            import pdb;pdb.set_trace()
304            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
305           
306            # Extract the digest value that was stored           
307            digestNode = getElements(refNode, "DigestValue")[0]
308            nodeDigestValue = str(digestNode.childNodes[0].nodeValue).strip()   
309           
310            # Reference validates if the two digest values are the same
311            if digestValue != nodeDigestValue:
312                raise Exception, 'Digest Values do not match for URI: "%s"' %\
313                                                                        refURI
314               
315        # 2) Signature Validation
316        signedInfoNode = xpath.Evaluate('//ds:SignedInfo',
317                                        contextNode=parsedSOAP.dom, 
318                                        context=ctxt)[0]
319
320        # Get the canonicalization method - change later to check this and
321        # make sure it's an algorithm supported by this code
322        c14nMethodNode = getElements(signedInfoNode, 
323                                     "CanonicalizationMethod")[0]
324                                             
325        algorithm = c14nMethodNode.getAttributeNodeNS(None, 'Algorithm').value
326        if algorithm != DSIG.C14N:
327            raise Exception, \
328                "Only \"%s\" canonicalization algorithm supported" % DSIG.C14N
329               
330        # Canonicalize the SignedInfo node and take digest
331        c14nSignedInfo = Canonicalize(signedInfoNode)       
332        signedInfoDigestValue = sha(c14nSignedInfo).digest()
333       
334        # Get the signature value in order to check against the digest just
335        # calculated
336        signatureValueNode = xpath.Evaluate('//ds:SignatureValue',
337                                            contextNode=parsedSOAP.dom, 
338                                            context=ctxt)[0]
339
340        # Remove base 64 encoding
341        b64EncSignatureValue = \
342                    str(signatureValueNode.childNodes[0].nodeValue).strip()
343                   
344        signatureValue = base64.decodestring(b64EncSignatureValue)
345
346
347        # Read X.509 Cert from wsse:BinarySecurityToken node
348        # - leave out for now and read direct from hard coded pem file
349        x509Cert = X509.load_cert(self.__certFilePath)
350       
351        # Extract RSA public key from the cert
352        rsaPubKey = x509Cert.get_pubkey().get_rsa()
353       
354        # Apply the signature verification
355        try:
356            verify = bool(rsaPubKey.verify(signedInfoDigestValue, 
357                                           signatureValue))
358        except RSA.RSAError:
359            raise Exception, "Invalid Signature"
360
361
362    def getTokenRefString(self):
363        """
364        Return an unique number to identify an element
365        for the element's id attribute
366        """
367        Signature.tokenRef +=1
368        return str(Signature.tokenRef)
369
370   
371     
372if __name__ == "__main__":
373    import sys
374    txt = None
375   
376    s = SignatureHandler(certFilePath='../Junk-cert.pem',
377                         priKeyFilePath='../Junk-key.pem',
378                         priKeyPwd=open('../tmp2').read().strip())
379   
380    if 'sign' in sys.argv:
381        txt = s.sign()     
382        print txt
383
384    if 'verify' in sys.argv:
385        if txt is None:
386            txt = open('./wsseSign-test-res.xml').read()
387           
388        print "Signature OK? %s" % s.verify(txt)
Note: See TracBrowser for help on using the repository browser.