source: TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py @ 1491

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/WS-Security/wsSecurity.py@1491
Revision 1491, 16.8 KB checked in by pjkersha, 14 years ago (diff)

Added EncryptionHandler? class to look at XML encryption.

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3"""WS-Security test class includes digital signature handler
4
5NERC Data Grid Project
6
7P J Kershaw 01/09/06
8
9Copyright (C) 2006 CCLRC & NERC
10
11This software may be distributed under the terms of the Q Public License,
12version 1.0 or later.
13"""
14
15reposID = '$Id$'
16
17import re
18
19# Digest and signature/verify
20from sha import sha
21from M2Crypto import X509, BIO, RSA
22import base64
23
24import ZSI
25from ZSI.wstools.Namespaces import DSIG, OASIS, GLOBUS, WSU, WSA200403, BEA, \
26                                   SOAP
27                                   
28from ZSI.TC import ElementDeclaration,TypeDefinition
29from ZSI.generate.pyclass import pyclass_type
30
31from ZSI.wstools.Utility import DOMException, SplitQName
32from ZSI.wstools.Utility import NamespaceError, MessageInterface, ElementProxy
33
34# XML Parsing
35from cStringIO import StringIO
36from Ft.Xml.Domlette import NonvalidatingReaderBase, NonvalidatingReader
37from Ft.Xml import XPath
38
39# Canonicalization
40from ZSI.wstools.c14n import Canonicalize
41from xml.dom import Node
42from xml.xpath.Context import Context
43from xml import xpath
44
45# Include for re-parsing doc ready for canonicalization in sign method - see
46# associated note
47from xml.dom.ext.reader.PyExpat import Reader
48
49
50class VerifyError(Exception):
51    """Raised from SignatureHandler.verify if signature is invalid"""
52
53class SignatureError(Exception):
54    """Flag if an error occurs during signature generation"""
55       
56class SignatureHandler(object):
57   
58    def __init__(self,
59                 certFilePath=None, 
60                 priKeyFilePath=None, 
61                 priKeyPwd=None):
62       
63        self.__certFilePath = certFilePath
64        self.__priKeyFilePath = priKeyFilePath
65        self.__priKeyPwd = priKeyPwd
66
67
68    def sign(self, soapWriter):
69        '''Sign the message body and binary security token of a SOAP message
70        '''
71        # Add X.509 cert as binary security token
72        x509Cert = X509.load_cert(self.__certFilePath)
73       
74        x509CertPat = re.compile(\
75            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
76            re.S)
77        x509CertStr = x509CertPat.findall(x509Cert.as_pem())[0]
78
79        soapWriter._header.setNamespaceAttribute('wsse', OASIS.WSSE)
80        soapWriter._header.setNamespaceAttribute('wsu', WSU.UTILITY)
81        soapWriter._header.setNamespaceAttribute('ds', DSIG.BASE)
82       
83        wsseElem = soapWriter._header.createAppendElement(OASIS.WSSE, 
84                                                         'Security')
85        wsseElem.setNamespaceAttribute('wsse', OASIS.WSSE)
86        wsseElem.node.setAttribute('SOAP-ENV:mustUnderstand', "1")
87       
88        binSecTokElem = wsseElem.createAppendElement(OASIS.WSSE, 
89                                                     'BinarySecurityToken')
90        binSecTokElem.node.setAttribute('ValueType', "wsse:X509v3")
91        binSecTokElem.node.setAttribute('EncodingType', "wsse:Base64Binary")
92       
93        # Add ID so that the binary token can be included in the signature
94        binSecTokElem.node.setAttribute('wsu:Id', "binaryToken")
95
96        binSecTokElem.createAppendTextNode(x509CertStr)
97
98       
99        # Signature
100        signatureElem = wsseElem.createAppendElement(DSIG.BASE, 'Signature')
101        signatureElem.setNamespaceAttribute('ds', DSIG.BASE)
102       
103        # Signature - Signed Info
104        signedInfoElem = signatureElem.createAppendElement(DSIG.BASE, 
105                                                           'SignedInfo')
106       
107        # Signed Info - Canonicalization method
108        c14nMethodElem = signedInfoElem.createAppendElement(DSIG.BASE,
109                                                    'CanonicalizationMethod')
110        c14nMethodElem.node.setAttribute('Algorithm', DSIG.C14N)
111       
112        # Signed Info - Signature method
113        sigMethodElem = signedInfoElem.createAppendElement(DSIG.BASE,
114                                                    'SignatureMethod')
115        sigMethodElem.node.setAttribute('Algorithm', DSIG.DIGEST_SHA1)
116       
117        # Signature - Signature value
118        signatureValueElem = signatureElem.createAppendElement(DSIG.BASE, 
119                                                             'SignatureValue')
120       
121        # Key Info
122        KeyInfoElem = signatureElem.createAppendElement(DSIG.BASE, 'KeyInfo')
123        secTokRefElem = KeyInfoElem.createAppendElement(OASIS.WSSE, 
124                                                  'SecurityTokenReference')
125       
126        # Reference back to the binary token included earlier
127        wsseRefElem = secTokRefElem.createAppendElement(OASIS.WSSE, 
128                                                        'Reference')
129        wsseRefElem.node.setAttribute('URI', "#binaryToken")
130       
131        # Add Reference to body so that it can be included in the signature
132        soapWriter.body.node.setAttribute('wsu:Id', "body")
133        soapWriter.body.node.setAttribute('xmlns:wsu', WSU.UTILITY)
134
135        # Serialize and re-parse prior to reference generation - calculating
136        # canonicalization based on soapWriter.dom.node seems to give an
137        # error: the order of wsu:Id attribute is not correct
138        docNode = Reader().fromString(str(soapWriter))
139       
140        # Namespaces for XPath searches
141        processorNss = \
142        {
143            'ds':     DSIG.BASE, 
144            'wsu':    WSU.UTILITY, 
145            'wsse':   OASIS.WSSE, 
146            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
147        }
148        ctxt = Context(docNode, processorNss=processorNss)
149        idNodes = xpath.Evaluate('//*[@wsu:Id]', 
150                                 contextNode=docNode, 
151                                 context=ctxt)
152               
153        # 1) Reference Generation
154        #
155        # Find references
156        for idNode in idNodes:
157           
158            # Set URI attribute to point to reference to be signed
159            #uri = u"#" + idNode.getAttribute('wsu:Id')
160            uri = u"#" + idNode.attributes[(WSU.UTILITY, 'Id')].value
161           
162            # Canonicalize reference
163            c14nRef = Canonicalize(idNode)
164           
165            # Calculate digest for reference and base 64 encode
166            #
167            # Nb. encodestring adds a trailing newline char
168            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
169
170
171            # Add a new reference element to SignedInfo
172            refElem = signedInfoElem.createAppendElement(DSIG.BASE, 
173                                                         'Reference')
174            refElem.node.setAttribute('URI', uri)
175           
176            # Use ds:Transforms or wsse:TransformationParameters?
177            tranformsElem = refElem.createAppendElement(DSIG.BASE, 
178                                                        'Transforms')
179            tranformElem = tranformsElem.createAppendElement(DSIG.BASE, 
180                                                             'Transform')
181            tranformElem.node.setAttribute('Algorithm', DSIG.C14N)
182           
183            # Digest Method
184            digestMethodElem = refElem.createAppendElement(DSIG.BASE, 
185                                                           'DigestMethod')
186            digestMethodElem.node.setAttribute('Algorithm', DSIG.DIGEST_SHA1)
187           
188            # Digest Value
189            digestValueElem = refElem.createAppendElement(DSIG.BASE, 
190                                                          'DigestValue')
191            digestValueElem.createAppendTextNode(digestValue)
192
193   
194        # 2) Signature Generation
195        #
196
197        # Test against signature generated by pyXMLSec version
198        #xmlTxt = open('./wsseSign-xmlsec-res.xml').read()
199        #dom = NonvalidatingReader.parseStream(StringIO(xmlTxt))
200       
201        # Canonicalize the signedInfo node
202        #
203        # Nb. When extracted the code adds the namespace attribute to the
204        # signedInfo!  This has important consequences for validation -
205        #
206        # 1) Do you strip the namespace attribute before taking the digest to
207        # ensure the text is exactly the same as what is displayed in the
208        # message?
209        #
210        # 2) Leave it in and assume the validation algorithm will expect to
211        # add in the namespace attribute?!
212        #
213        # http://www.w3.org/TR/xml-c14n#NoNSPrefixRewriting implies you need
214        # to include namespace declarations for namespaces referenced in a doc
215        # subset - yes to 2)
216        c14nSignedInfo = signedInfoElem.canonicalize()
217
218        # Calculate digest of SignedInfo
219        signedInfoDigestValue = sha(c14nSignedInfo).digest().strip()
220       
221        # Read Private key to sign with   
222        priKeyFile = BIO.File(open(self.__priKeyFilePath))                                           
223        priKey = RSA.load_key_bio(priKeyFile, 
224                                  callback=lambda *ar, **kw: self.__priKeyPwd)
225       
226        # Sign using the private key and base 64 encode the result
227        signatureValue = priKey.sign(signedInfoDigestValue)
228        b64EncSignatureValue = base64.encodestring(signatureValue).strip()
229
230        # Add to <SignatureValue>
231        signatureValueElem.createAppendTextNode(b64EncSignatureValue)
232       
233        # Extract RSA public key from the cert
234        rsaPubKey = x509Cert.get_pubkey().get_rsa()
235       
236        # Check the signature
237#        verify = bool(rsaPubKey.verify(signedInfoDigestValue, signatureValue))
238#       
239#        open('soap.xml', 'w').write(str(soapWriter))
240#        import pdb;pdb.set_trace()
241        print "Signature Generated"
242
243
244    def verify(self, parsedSOAP):
245        """Verify signature"""
246       
247        element = ElementProxy(None, message=parsedSOAP.header)
248        processorNss = \
249        {
250            'ds':     DSIG.BASE, 
251            'wsu':    WSU.UTILITY, 
252            'wsse':   OASIS.WSSE, 
253            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
254        }
255        ctxt = Context(parsedSOAP.dom, processorNss=processorNss)
256       
257
258        signatureNodes = xpath.Evaluate('//ds:Signature', 
259                                        contextNode=parsedSOAP.dom, 
260                                        context=ctxt)
261        if len(signatureNodes) > 1:
262            raise VerifyError, 'Multiple ds:Signature elements found'
263       
264        try:
265            signatureNodes = signatureNodes[0]
266        except:
267            # Message wasn't signed
268            return
269       
270        # Two stage process: reference validation followed by signature
271        # validation
272       
273        # 1) Reference Validation       
274        refNodes = xpath.Evaluate('//ds:Reference', 
275                                  contextNode=parsedSOAP.dom, 
276                                  context=ctxt)
277           
278        # Make a lambda to pick out node child or children with a given
279        # name
280        getElements = lambda node, nameList: \
281                [n for n in node.childNodes if str(n.localName) in nameList] 
282       
283        for refNode in refNodes:
284            # Get the URI for the reference
285            refURI = refNode.getAttributeNodeNS(None, 'URI').value
286                           
287            transformsNode = getElements(refNode, "Transforms")[0]
288            transforms = getElements(transformsNode, "Transform")
289   
290            algorithm = transforms[0].getAttributeNodeNS(None, 
291                                                         "Algorithm").value
292           
293            # Add extra keyword for Exclusive canonicalization method
294            kw = {}
295            if algorithm == DSIG.C14N_EXCL:
296                try:
297                    inclusiveNS = transforms[0].getElement(DSIG.C14N_EXCL, 
298                                                       "InclusiveNamespaces")
299                    kw['unsuppressedPrefixes'] = \
300                    inclusiveNS.getAttributeValue(None, "PrefixList").split()
301                except:
302                    raise VerifyError, \
303                'failed to handle transform (%s) in <ds:Reference URI="%s">'%\
304                        (transforms[0], uri)
305       
306            # Canonicalize the reference data and calculate the digest
307            if refURI[0] != "#":
308                raise VerifyError, \
309                    "Expecting # identifier for Reference URI \"%s\"" % refURI
310                   
311            # XPath reference
312            uriXPath = '//*[@wsu:Id="%s"]' % refURI[1:]
313            uriNode = xpath.Evaluate(uriXPath, 
314                                     contextNode=parsedSOAP.dom, 
315                                     context=ctxt)[0]
316
317            c14nRef = Canonicalize(uriNode, **kw)
318            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
319           
320            # Extract the digest value that was stored           
321            digestNode = getElements(refNode, "DigestValue")[0]
322            nodeDigestValue = str(digestNode.childNodes[0].nodeValue).strip()   
323           
324            # Reference validates if the two digest values are the same
325            if digestValue != nodeDigestValue:
326                raise VerifyError, 'Digest Values do not match for URI: "%s"' %\
327                                                                        refURI
328               
329        # 2) Signature Validation
330        signedInfoNode = xpath.Evaluate('//ds:SignedInfo',
331                                        contextNode=parsedSOAP.dom, 
332                                        context=ctxt)[0]
333
334        # Get the canonicalization method - change later to check this and
335        # make sure it's an algorithm supported by this code
336        c14nMethodNode = getElements(signedInfoNode, 
337                                     "CanonicalizationMethod")[0]
338                                             
339        algorithm = c14nMethodNode.getAttributeNodeNS(None, 'Algorithm').value
340        if algorithm != DSIG.C14N:
341            raise VerifyError, \
342                "Only \"%s\" canonicalization algorithm supported" % DSIG.C14N
343               
344        # Canonicalize the SignedInfo node and take digest
345        c14nSignedInfo = Canonicalize(signedInfoNode)       
346        signedInfoDigestValue = sha(c14nSignedInfo).digest()
347       
348        # Get the signature value in order to check against the digest just
349        # calculated
350        signatureValueNode = xpath.Evaluate('//ds:SignatureValue',
351                                            contextNode=parsedSOAP.dom, 
352                                            context=ctxt)[0]
353
354        # Remove base 64 encoding
355        b64EncSignatureValue = \
356                    str(signatureValueNode.childNodes[0].nodeValue).strip()
357                   
358        signatureValue = base64.decodestring(b64EncSignatureValue)
359
360
361        # Read X.509 Cert from wsse:BinarySecurityToken node
362        # - leave out for now and read direct from hard coded pem file
363        x509Cert = X509.load_cert(self.__certFilePath)
364       
365        # Extract RSA public key from the cert
366        rsaPubKey = x509Cert.get_pubkey().get_rsa()
367       
368        # Apply the signature verification
369        try:
370            verify = bool(rsaPubKey.verify(signedInfoDigestValue, 
371                                           signatureValue))
372        except RSA.RSAError:
373            raise VerifyError, "Invalid Signature"
374       
375        print "Signature OK"
376
377
378class EncryptionHandler(object):
379    """Encrypt/Decrypt SOAP messages using WS-Security""" 
380       
381    def __init__(self,
382                 certFilePath=None, 
383                 priKeyFilePath=None, 
384                 priKeyPwd=None):
385       
386        self.__certFilePath = certFilePath
387        self.__priKeyFilePath = priKeyFilePath
388        self.__priKeyPwd = priKeyPwd
389
390
391    def encrypt(self, soapWriter):
392        """Encrypt an outbound SOAP message"""
393       
394        import pdb;pdb.set_trace()
395       
396        # Use X.509 Cert to encrypt
397        x509Cert = X509.load_cert(self.__certFilePath)
398       
399        # Extract RSA public key from the cert
400        rsaPubKey = x509Cert.get_pubkey().get_rsa()
401       
402        data = "The Larch"
403        padding = ''.join([' ']*7)
404        data += padding
405        encryptedData = rsaPubKey.public_encrypt(data, RSA.pkcs1_padding)
406       
407        return encryptedData
408
409       
410    def decrypt(self, parsedSOAP, encryptedData=None):
411        """Decrypt an inbound SOAP message"""
412       
413        # Read Private key to decrypt the data     
414        priKeyFile = BIO.File(open(self.__priKeyFilePath))                                           
415        priKey = RSA.load_key_bio(priKeyFile, 
416                                  callback=lambda *ar, **kw: self.__priKeyPwd)
417       
418        padding = ''.join([' ']*7)
419        decryptedData = priKey.private_decrypt(encryptedData, padding)
420       
421        print decryptedData
422
423       
424if __name__ == "__main__":
425    import sys
426    txt = None
427   
428    e = EncryptionHandler(certFilePath='../../Junk-cert.pem',
429                          priKeyFilePath='../../Junk-key.pem',
430                          priKeyPwd=open('../../tmp2').read().strip())
431   
432    print e.decrypt(e.encrypt(None))
433   
Note: See TracBrowser for help on using the repository browser.