source: TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py @ 1469

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py@1469
Revision 1469, 15.6 KB checked in by pjkersha, 13 years ago (diff)

Working version with signature and validation across a SOAP interface.

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3"""WS-Security test class includes digital signature handler
4
5NERC Data Grid Project
6
7P J Kershaw 01/09/06
8
9Copyright (C) 2006 CCLRC & NERC
10
11This software may be distributed under the terms of the Q Public License,
12version 1.0 or later.
13"""
14
15reposID = '$Id$'
16
17import re
18
19# Digest and signature/verify
20from sha import sha
21from M2Crypto import X509, BIO, RSA
22import base64
23
24import ZSI
25from ZSI.wstools.Namespaces import DSIG, OASIS, GLOBUS, WSU, WSA200403, BEA, \
26                                   SOAP
27                                   
28from ZSI.TC import ElementDeclaration,TypeDefinition
29from ZSI.generate.pyclass import pyclass_type
30
31from ZSI.wstools.Utility import DOMException, SplitQName
32from ZSI.wstools.Utility import NamespaceError, MessageInterface, ElementProxy
33
34# XML Parsing
35from cStringIO import StringIO
36from Ft.Xml.Domlette import NonvalidatingReaderBase, NonvalidatingReader
37from Ft.Xml import XPath
38
39# Canonicalization
40from ZSI.wstools.c14n import Canonicalize
41from xml.dom import Node
42from xml.xpath.Context import Context
43from xml import xpath
44
45# Include for re-parsing doc ready for canonicalization in sign method - see
46# associated note
47from xml.dom.ext.reader.PyExpat import Reader
48
49
50class VerifyError(Exception):
51    """Raised from SignatureHandler.verify if signature is invalid"""
52
53class SignatureError(Exception):
54    """Flag if an error occurs during signature generation"""
55       
56class SignatureHandler(object):
57   
58    def __init__(self,
59                 certFilePath=None, 
60                 priKeyFilePath=None, 
61                 priKeyPwd=None, 
62                 sign_body=True, 
63                 sign_headers=True):
64       
65        self.__certFilePath = certFilePath
66        self.__priKeyFilePath = priKeyFilePath
67        self.__priKeyPwd = priKeyPwd
68
69
70    def sign(self, soapWriter):
71        '''Sign the message body and binary security token of a SOAP message
72        '''
73        # Add X.509 cert as binary security token
74        x509Cert = X509.load_cert(self.__certFilePath)
75       
76        x509CertPat = re.compile(\
77            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
78            re.S)
79        x509CertStr = x509CertPat.findall(x509Cert.as_pem())[0]
80
81        soapWriter._header.setNamespaceAttribute('wsse', OASIS.WSSE)
82        soapWriter._header.setNamespaceAttribute('wsu', WSU.UTILITY)
83        soapWriter._header.setNamespaceAttribute('ds', DSIG.BASE)
84       
85        wsseElem = soapWriter._header.createAppendElement(OASIS.WSSE, 
86                                                         'Security')
87        wsseElem.setNamespaceAttribute('wsse', OASIS.WSSE)
88        wsseElem.node.setAttribute('SOAP-ENV:mustUnderstand', "1")
89       
90        binSecTokElem = wsseElem.createAppendElement(OASIS.WSSE, 
91                                                     'BinarySecurityToken')
92        binSecTokElem.node.setAttribute('ValueType', "wsse:X509v3")
93        binSecTokElem.node.setAttribute('EncodingType', "wsse:Base64Binary")
94       
95        # Add ID so that the binary token can be included in the signature
96        binSecTokElem.node.setAttribute('wsu:Id', "binaryToken")
97
98        binSecTokElem.createAppendTextNode(x509CertStr)
99
100       
101        # Signature
102        signatureElem = wsseElem.createAppendElement(DSIG.BASE, 'Signature')
103        signatureElem.setNamespaceAttribute('ds', DSIG.BASE)
104       
105        # Signature - Signed Info
106        signedInfoElem = signatureElem.createAppendElement(DSIG.BASE, 
107                                                           'SignedInfo')
108       
109        # Signed Info - Canonicalization method
110        c14nMethodElem = signedInfoElem.createAppendElement(DSIG.BASE,
111                                                    'CanonicalizationMethod')
112        c14nMethodElem.node.setAttribute('Algorithm', DSIG.C14N)
113       
114        # Signed Info - Signature method
115        sigMethodElem = signedInfoElem.createAppendElement(DSIG.BASE,
116                                                    'SignatureMethod')
117        sigMethodElem.node.setAttribute('Algorithm', DSIG.DIGEST_SHA1)
118       
119        # Signature - Signature value
120        signatureValueElem = signatureElem.createAppendElement(DSIG.BASE, 
121                                                             'SignatureValue')
122       
123        # Key Info
124        KeyInfoElem = signatureElem.createAppendElement(DSIG.BASE, 'KeyInfo')
125        secTokRefElem = KeyInfoElem.createAppendElement(OASIS.WSSE, 
126                                                  'SecurityTokenReference')
127       
128        # Reference back to the binary token included earlier
129        wsseRefElem = secTokRefElem.createAppendElement(OASIS.WSSE, 
130                                                        'Reference')
131        wsseRefElem.node.setAttribute('URI', "#binaryToken")
132       
133        # Add Reference to body so that it can be included in the signature
134        soapWriter.body.node.setAttribute('wsu:Id', "body")
135        soapWriter.body.node.setAttribute('xmlns:wsu', WSU.UTILITY)
136
137
138        # Serialize and re-parse prior to reference generation - calculating
139        # canonicalization based on soapWriter.dom.node seems to give an
140        # error: the order of wsu:Id attribute is not correct
141        docNode = Reader().fromString(str(soapWriter))
142       
143        # Namespaces for XPath searches
144        processorNss = \
145        {
146            'ds':     DSIG.BASE, 
147            'wsu':    WSU.UTILITY, 
148            'wsse':   OASIS.WSSE, 
149            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
150        }
151        ctxt = Context(docNode, processorNss=processorNss)
152        idNodes = xpath.Evaluate('//*[@wsu:Id]', 
153                                 contextNode=docNode, 
154                                 context=ctxt)
155               
156        # 1) Reference Generation
157        #
158        # Find references
159        for idNode in idNodes:
160           
161            # Set URI attribute to point to reference to be signed
162            #uri = u"#" + idNode.getAttribute('wsu:Id')
163            uri = u"#" + idNode.attributes[(WSU.UTILITY, 'Id')].value
164           
165            # Canonicalize reference
166            c14nRef = Canonicalize(idNode)
167           
168            # Calculate digest for reference and base 64 encode
169            #
170            # Nb. encodestring adds a trailing newline char
171            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
172
173
174            # Add a new reference element to SignedInfo
175            refElem = signedInfoElem.createAppendElement(DSIG.BASE, 
176                                                         'Reference')
177            refElem.node.setAttribute('URI', uri)
178           
179            # Use ds:Transforms or wsse:TransformationParameters?
180            tranformsElem = refElem.createAppendElement(DSIG.BASE, 
181                                                        'Transforms')
182            tranformElem = tranformsElem.createAppendElement(DSIG.BASE, 
183                                                             'Transform')
184            tranformElem.node.setAttribute('Algorithm', DSIG.C14N)
185           
186            # Digest Method
187            digestMethodElem = refElem.createAppendElement(DSIG.BASE, 
188                                                           'DigestMethod')
189            digestMethodElem.node.setAttribute('Algorithm', DSIG.DIGEST_SHA1)
190           
191            # Digest Value
192            digestValueElem = refElem.createAppendElement(DSIG.BASE, 
193                                                          'DigestValue')
194            digestValueElem.createAppendTextNode(digestValue)
195
196   
197        # 2) Signature Generation
198        #
199
200        # Test against signature generated by pyXMLSec version
201        #xmlTxt = open('./wsseSign-xmlsec-res.xml').read()
202        #dom = NonvalidatingReader.parseStream(StringIO(xmlTxt))
203       
204        # Canonicalize the signedInfo node
205        #
206        # Nb. When extracted the code adds the namespace attribute to the
207        # signedInfo!  This has important consequences for validation -
208        #
209        # 1) Do you strip the namespace attribute before taking the digest to
210        # ensure the text is exactly the same as what is displayed in the
211        # message?
212        #
213        # 2) Leave it in and assume the validation algorithm will expect to
214        # add in the namespace attribute?!
215        #
216        # http://www.w3.org/TR/xml-c14n#NoNSPrefixRewriting implies you need
217        # to include namespace declarations for namespaces referenced in a doc
218        # subset - yes to 2)
219        c14nSignedInfo = signedInfoElem.canonicalize()
220
221        # Calculate digest of SignedInfo
222        signedInfoDigestValue = sha(c14nSignedInfo).digest().strip()
223       
224        # Read Private key to sign with   
225        priKeyFile = BIO.File(open(self.__priKeyFilePath))                                           
226        priKey = RSA.load_key_bio(priKeyFile, 
227                                  callback=lambda *ar, **kw: self.__priKeyPwd)
228       
229        # Sign using the private key and base 64 encode the result
230        signatureValue = priKey.sign(signedInfoDigestValue)
231        b64EncSignatureValue = base64.encodestring(signatureValue).strip()
232
233        # Add to <SignatureValue>
234        signatureValueElem.createAppendTextNode(b64EncSignatureValue)
235       
236        # Extract RSA public key from the cert
237        rsaPubKey = x509Cert.get_pubkey().get_rsa()
238       
239        # Check the signature
240#        verify = bool(rsaPubKey.verify(signedInfoDigestValue, signatureValue))
241#       
242#        open('soap.xml', 'w').write(str(soapWriter))
243#        import pdb;pdb.set_trace()
244        print "Signature Generated"
245
246
247    def verify(self, parsedSOAP):
248        """Verify signature"""
249       
250        element = ElementProxy(None, message=parsedSOAP.header)
251        processorNss = \
252        {
253            'ds':     DSIG.BASE, 
254            'wsu':    WSU.UTILITY, 
255            'wsse':   OASIS.WSSE, 
256            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
257        }
258        ctxt = Context(parsedSOAP.dom, processorNss=processorNss)
259       
260
261        signatureNodes = xpath.Evaluate('//ds:Signature', 
262                                        contextNode=parsedSOAP.dom, 
263                                        context=ctxt)
264        if len(signatureNodes) > 1:
265            raise VerifyError, 'Multiple ds:Signature elements found'
266       
267        try:
268            signatureNodes = signatureNodes[0]
269        except:
270            # Message wasn't signed
271            return
272       
273        # Two stage process: reference validation followed by signature
274        # validation
275       
276        # 1) Reference Validation       
277        refNodes = xpath.Evaluate('//ds:Reference', 
278                                  contextNode=parsedSOAP.dom, 
279                                  context=ctxt)
280           
281        # Make a lambda to pick out node child or children with a given
282        # name
283        getElements = lambda node, nameList: \
284                [n for n in node.childNodes if str(n.localName) in nameList] 
285       
286        for refNode in refNodes:
287            # Get the URI for the reference
288            refURI = refNode.getAttributeNodeNS(None, 'URI').value
289                           
290            transformsNode = getElements(refNode, "Transforms")[0]
291            transforms = getElements(transformsNode, "Transform")
292   
293            algorithm = transforms[0].getAttributeNodeNS(None, 
294                                                         "Algorithm").value
295           
296            # Add extra keyword for Exclusive canonicalization method
297            kw = {}
298            if algorithm == DSIG.C14N_EXCL:
299                try:
300                    inclusiveNS = transforms[0].getElement(DSIG.C14N_EXCL, 
301                                                       "InclusiveNamespaces")
302                    kw['unsuppressedPrefixes'] = \
303                    inclusiveNS.getAttributeValue(None, "PrefixList").split()
304                except:
305                    raise VerifyError, \
306                'failed to handle transform (%s) in <ds:Reference URI="%s">'%\
307                        (transforms[0], uri)
308       
309            # Canonicalize the reference data and calculate the digest
310            if refURI[0] != "#":
311                raise VerifyError, \
312                    "Expecting # identifier for Reference URI \"%s\"" % refURI
313                   
314            # XPath reference
315            uriXPath = '//*[@wsu:Id="%s"]' % refURI[1:]
316            uriNode = xpath.Evaluate(uriXPath, 
317                                     contextNode=parsedSOAP.dom, 
318                                     context=ctxt)[0]
319
320            c14nRef = Canonicalize(uriNode, **kw)
321            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
322           
323            # Extract the digest value that was stored           
324            digestNode = getElements(refNode, "DigestValue")[0]
325            nodeDigestValue = str(digestNode.childNodes[0].nodeValue).strip()   
326           
327            # Reference validates if the two digest values are the same
328            if digestValue != nodeDigestValue:
329                raise VerifyError, 'Digest Values do not match for URI: "%s"' %\
330                                                                        refURI
331               
332        # 2) Signature Validation
333        signedInfoNode = xpath.Evaluate('//ds:SignedInfo',
334                                        contextNode=parsedSOAP.dom, 
335                                        context=ctxt)[0]
336
337        # Get the canonicalization method - change later to check this and
338        # make sure it's an algorithm supported by this code
339        c14nMethodNode = getElements(signedInfoNode, 
340                                     "CanonicalizationMethod")[0]
341                                             
342        algorithm = c14nMethodNode.getAttributeNodeNS(None, 'Algorithm').value
343        if algorithm != DSIG.C14N:
344            raise VerifyError, \
345                "Only \"%s\" canonicalization algorithm supported" % DSIG.C14N
346               
347        # Canonicalize the SignedInfo node and take digest
348        c14nSignedInfo = Canonicalize(signedInfoNode)       
349        signedInfoDigestValue = sha(c14nSignedInfo).digest()
350       
351        # Get the signature value in order to check against the digest just
352        # calculated
353        signatureValueNode = xpath.Evaluate('//ds:SignatureValue',
354                                            contextNode=parsedSOAP.dom, 
355                                            context=ctxt)[0]
356
357        # Remove base 64 encoding
358        b64EncSignatureValue = \
359                    str(signatureValueNode.childNodes[0].nodeValue).strip()
360                   
361        signatureValue = base64.decodestring(b64EncSignatureValue)
362
363
364        # Read X.509 Cert from wsse:BinarySecurityToken node
365        # - leave out for now and read direct from hard coded pem file
366        x509Cert = X509.load_cert(self.__certFilePath)
367       
368        # Extract RSA public key from the cert
369        rsaPubKey = x509Cert.get_pubkey().get_rsa()
370       
371        # Apply the signature verification
372        try:
373            verify = bool(rsaPubKey.verify(signedInfoDigestValue, 
374                                           signatureValue))
375        except RSA.RSAError:
376            raise VerifyError, "Invalid Signature"
377       
378        print "Signature OK"
379   
380     
381if __name__ == "__main__":
382    import sys
383    txt = None
384   
385    s = SignatureHandler(certFilePath='../Junk-cert.pem',
386                         priKeyFilePath='../Junk-key.pem',
387                         priKeyPwd=open('../tmp2').read().strip())
388   
389    if 'sign' in sys.argv:
390        txt = s.sign()     
391        print txt
392
393    if 'verify' in sys.argv:
394        if txt is None:
395            txt = open('./wsseSign-test-res.xml').read()
396           
397        print "Signature OK? %s" % s.verify(txt)
Note: See TracBrowser for help on using the repository browser.