source: TI12-security/trunk/python/Tests/xmlsec/wsSecurity.py @ 1440

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/wsSecurity.py@1440
Revision 1440, 18.3 KB checked in by pjkersha, 13 years ago (diff)

Working version using multiple references and # style wsu:Id links to document sections to be signed. Both
SOAP message content and X.509 token are signed.

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3import re
4
5# Digest and signature/verify
6from sha import sha
7from M2Crypto import X509, BIO, RSA
8import base64
9
10import cElementTree as ElementTree
11import ZSI
12from ZSI.wstools.Namespaces import DSIG, OASIS, GLOBUS, WSU, WSA200403, BEA, \
13                                   SOAP
14
15# XML Parsing
16from cStringIO import StringIO
17from Ft.Xml.Domlette import NonvalidatingReaderBase, NonvalidatingReader
18from Ft.Xml import XPath
19
20# Canonicalization
21from ZSI.wstools.c14n import Canonicalize
22from xml.dom import Node
23
24# Type codes for signature elements
25#from ZSI.TC import _get_global_element_declaration as GED
26
27
28#class BST(str):
29#    typecode = GED(OASIS.WSSE, "BinarySecurityToken")
30
31#KeyInfo = GED(DSIG.BASE, "KeyInfo").pyclass
32#Reference = GED(OASIS.WSSE, "Reference").pyclass
33#SecurityReference = GED(OASIS.WSSE, "SecurityTokenReference").pyclass
34#Signature = GED(DSIG.BASE, "Signature").pyclass
35
36
37class SignatureHandler(object):
38   
39    # Unique token reference
40    tokenRef = 10101
41   
42    def __init__(self, sign_body=True, sign_headers=True):
43       
44        self._can_algo = DSIG.C14N
45        self._sig_algo = GLOBUS.SIG
46        self._dig_algo = DSIG.DIGEST_SHA1
47        self._docCtxtID = None
48        self._expire_time = 300
49        self._setProtocol = False
50        self._sign_headers = sign_headers
51        self._sign_body = sign_body
52
53   
54    def sign2(self, certFilePath, priKeyFilePath, priKeyPwd=None):
55        """Sign an XML doc       
56
57        From pyGridWare GssSignatureHandler
58
59        General algorithm
60        - Create the authentication information
61        elements (ie BinarySecurityToken)
62        and security elements(ie timestamp)
63       
64        - Create digests for anything in the
65        body of the Message
66
67        - Fill in the SignedInfoNode, create elements
68        for digests of elements in the header
69        leave the content of the element empty. Create
70        a SignatureValue element also leavethe content
71        of that element empty.
72
73        - serialize the header
74
75        - Calculate the digests for elements in the
76        header(we had to wait until the header was
77        serialized to do this) Put the digests as
78        the content for those empty digestValue elements
79        that we had created
80
81        - Calculate the SignatureValue and place it in
82        the content of the SignatureValue element
83       
84        """
85
86        docCtxt_id = self._docCtxtID
87
88        if self._sign_headers: 
89            pass#self.signHeaderElements(sw)
90           
91        if self._sign_body: 
92            pass#self.signBodyElement(sw)
93       
94        pubKey = X509.load_cert(certFilePath)
95       
96        pubKeyPat = re.compile(\
97            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
98            re.S)
99        x509Cert = pubKeyPat.findall(pubKey.as_pem())[0]
100
101        import pdb;pdb.set_trace()
102        binarySecurityToken = BST(x509Cert)
103        #move this GssSecureMessage
104        tokenRefString = self.getTokenRefString()
105        binarySecurityToken._attrs = \
106        {
107            (OASIS.UTILITY,'Id'):  "CertId-" + tokenRefString,
108            'ValueType':           "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509PKIPathv1",
109            'EncodingType':        "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary"
110        }
111        attrTypeCodeDict = \
112        {
113            (OASIS.UTILITY, 'Id'):  ZSI.TC.String(),
114            'ValueType':            ZSI.TC.String(),
115            'EncodingType':         ZSI.TC.String()
116        }
117        binarySecurityToken.typecode.attribute_typecode_dict.update(attrTypeCodeDict)
118
119         
120        typecode = GED(NS1, "Security")
121        pyobj = typecode.pyclass()
122        pyobj._any = []
123        pyobj._any.append(binarySecurityToken)   
124             
125                                 
126        sig = Signature()
127        pyobj._any.append(sig)
128        sig._SignedInfo = Holder()
129        sig._SignedInfo._CanonicalizationMethod = Holder()
130        sig._SignedInfo._CanonicalizationMethod._attrs = {'Algorithm':self._can_algo}
131        sig._SignedInfo._SignatureMethod = Holder()
132        sig._SignedInfo._SignatureMethod._attrs = {'Algorithm':self._sig_algo}
133        sig._SignedInfo._Reference = []
134        sig._SignatureValue = ''
135       
136        sig._KeyInfo = KeyInfo()
137        secRef = SecurityReference()
138        secRef._any = Reference()
139        secRef._any._attrs = {}
140        secRef._any._attrs['URI'] = "#CertId-" + tokenRefString
141        sig._KeyInfo._any = secRef
142
143        '''
144        processorNss={'wsu':OASIS.UTILITY, 'ds':DSIG.BASE}
145        sw.dom.setContext(processorNss=processorNss)
146        nodes = sw.dom.evaluate(expression='//*[@wsu:Id]')
147
148        #Calculate the digests in the body
149        for node in nodes:
150            URI = '#%s' %node.getAttributeValue(OASIS.UTILITY, 'Id')
151            ref = Holder()
152            ref._attrs = {'URI': URI}
153            ref._DigestMethod = Holder()
154            ref._DigestMethod._attrs = {'Algorithm':self._dig_algo}
155            ref._DigestValue = self.getDigestValueFromSoapWriter(sw, URI)
156            sig._SignedInfo._Reference.append(ref)
157
158        #These haven't been serialized yet, the id's to take
159        #digests of in the security header
160        idUriList = []
161        for idUri in idUriList:
162            ref = Holder()
163            ref._attrs = {'URI': idUri}
164            ref._DigestMethod = Holder()
165            ref._DigestMethod._attrs = {'Algorithm':self._dig_algo}
166            ref._DigestValue = ''
167            sig._SignedInfo._Reference.append(ref)
168
169        sw.serialize_header(pyobj, typecode)
170
171        # Reset Context after append on DOM
172        sw.dom.setContext(processorNss=processorNss)
173        for URI in idUriList:
174            digestValue = self.getDigestValueFromSoapWriter(sw, URI)
175            referenceNodes = sw.dom.evaluate(expression='//ds:Reference')
176           
177            for referenceNode in referenceNodes:
178                refUri = referenceNode.getAttributeValue(None, "URI")
179                if refUri == URI:
180                    digestValueNode = referenceNode.getElement(DSIG.BASE, "DigestValue")
181                    digestValueNode.createAppendTextNode(base64.encodestring(digestValue))
182           
183        # Reset Context after append on DOM
184        sw.dom.setContext(processorNss=processorNss)
185        nodes = sw.dom.evaluate(expression='//ds:SignedInfo')
186        signedInfo = nodes[0]
187        signedInfoStr = signedInfo.canonicalize()
188        nodes = sw.dom.evaluate(expression='//ds:SignatureValue')
189        signatureValueNode = nodes[0]
190        if useSecureMessage:
191            hashedSignedInfoStr = sha.sha(signedInfoStr).digest()
192            signatureValue = base64.encodestring(auth.sign(hashedSignedInfoStr))
193        else:
194            signatureValue = base64.encodestring(secContext.getMIC(signedInfoStr))
195        signatureValueNode.createAppendTextNode(signatureValue)
196'''
197
198    def sign(self, certFilePath, priKeyFilePath, priKeyPwd=None):
199
200        # Use wsse:TransformationParameters with ds:Transforms??
201        msgTmpl = """<?xml version="1.0" encoding="UTF-8"?>
202<!--
203SOAP Message with WSSE Signature
204-->
205<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
206        xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing"
207        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
208        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
209    <soapenv:Header>
210        <wsse:Security
211                xmlns:wsse="http://schemas.xmlsoap.org/ws/2002/04/secext"
212                xmlns:wsu="http://schemas.xmlsoap.org/ws/2002/07/utility"
213                soapenv:mustUnderstand="1">
214            <wsse:BinarySecurityToken
215                wsu:Id="binaryToken"
216                ValueType="wsse:X509v3"
217                EncodingType="wsse:Base64Binary">
218%s
219            </wsse:BinarySecurityToken>
220            <ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
221                <ds:SignedInfo>
222                    <ds:CanonicalizationMethod
223                    Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
224                    <ds:SignatureMethod
225                    Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
226                </ds:SignedInfo>
227                <ds:SignatureValue>%s</ds:SignatureValue>
228                <ds:KeyInfo>
229                    <wsse:SecurityTokenReference>
230                        <wsse:Reference URI="#binaryToken"/>
231                    </wsse:SecurityTokenReference>
232                </ds:KeyInfo>
233            </ds:Signature>
234        </wsse:Security>
235    </soapenv:Header>
236    <soapenv:Body xmlns:wsu="http://schemas.xmlsoap.org/ws/2002/07/utility"
237            wsu:Id="body">
238    Hello, World!
239    </soapenv:Body>
240</soapenv:Envelope>"""
241
242
243        # Add X.509 cert as binary security token
244        x509Cert = X509.load_cert(certFilePath)
245       
246        x509CertPat = re.compile(\
247            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
248            re.S)
249        x509CertStr = x509CertPat.findall(x509Cert.as_pem())[0]
250       
251        msgTxt = msgTmpl % (x509CertStr, "%s")
252        msgDoc = NonvalidatingReader.parseStream(StringIO(msgTxt))
253       
254        # Namespaces for XPath searches
255        processorNss = \
256        {
257            'ds':     DSIG.BASE, 
258            'wsu':    WSU.UTILITY, 
259            'wsse':   OASIS.WSSE, 
260            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
261        }
262        docCtxt = XPath.Context.Context(msgDoc, processorNss=processorNss)
263
264        import pdb;pdb.set_trace()
265               
266        # 1) Reference Generation
267        #
268        refTmpl = \
269"""         <ds:Reference xmlns:ds="http://www.w3.org/2000/09/xmldsig#" URI="%s">
270                <ds:Transforms>
271                    <ds:Transform
272                    Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
273                </ds:Transforms>
274                <ds:DigestMethod
275                Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
276                <ds:DigestValue>%s</ds:DigestValue>
277            </ds:Reference>
278"""
279
280        # Find references
281        signedInfoNode = XPath.Compile('//ds:SignedInfo').evaluate(docCtxt)[0]
282       
283        idNodes = XPath.Compile('//*[@wsu:Id]').evaluate(docCtxt)
284        for idNode in idNodes:
285           
286            # Set URI attribute to point to reference to be signed
287            uri = u"#" + idNode.attributes[(WSU.UTILITY, u'Id')].value
288           
289            # Canonicalize reference
290            c14nRef = Canonicalize(idNode)
291           
292            # Calculate digest for reference and base 64 encode
293            #
294            # Nb. encodestring adds a trailing newline char
295            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
296
297            refTxt = refTmpl % (uri, digestValue)
298            refNode = NonvalidatingReader.parseStream(StringIO(refTxt))
299
300            # Add the new child node
301            signedInfoNode.appendChild(refNode)
302
303       
304        # 2) Signature Generation
305        #
306
307        # Test against signature generated by pyXMLSec version
308        #xmlTxt = open('./wsseSign-xmlsec-res.xml').read()
309        #dom = NonvalidatingReader.parseStream(StringIO(xmlTxt))
310       
311        # Canonicalize the signedInfo node
312        #
313        # Nb. When extracted the code adds the namespace attribute to the
314        # signedInfo!  This has important consequences for validation -
315        #
316        # 1) Do you strip the namespace attribute before taking the digest to
317        # ensure the text is exactly the same as what is displayed in the
318        # message?
319        #
320        # 2) Leave it in and assume the validation algorithm will expect to
321        # add in the namespace attribute?!
322        #
323        # http://www.w3.org/TR/xml-c14n#NoNSPrefixRewriting implies you need
324        # to include namespace declarations for namespaces referenced in a doc
325        # subset - yes to 2)
326        c14nSignedInfo = Canonicalize(signedInfoNode)
327
328        # Calculate digest of SignedInfo
329        signedInfoDigestValue = sha(c14nSignedInfo).digest().strip()
330       
331        # Read Private key to sign with   
332        priKeyFile = BIO.File(open(priKeyFilePath))                                           
333        priKey = RSA.load_key_bio(priKeyFile, 
334                                  callback=lambda *args, **kwargs: priKeyPwd)
335       
336        # Sign using the private key and base 64 encode the result
337        signatureValue = priKey.sign(signedInfoDigestValue)
338        b64EncSignatureValue = base64.encodestring(signatureValue).strip()
339
340        signedMsgTxt = Canonicalize(msgDoc) % b64EncSignatureValue
341       
342       
343        # Extract RSA public key from the cert
344        rsaPubKey = x509Cert.get_pubkey().get_rsa()
345       
346        # Check the signature
347        verify = bool(rsaPubKey.verify(signedInfoDigestValue, signatureValue))
348       
349        return signedMsgTxt
350
351
352    def verify(self, xmlTxt, certFilePath=None):
353        """Verify signature"""
354        import pdb;pdb.set_trace()
355       
356        # Parse file
357        doc = NonvalidatingReader.parseStream(StringIO(xmlTxt))
358       
359        # Set namespaces for XPath queries
360        processorNss = \
361        {
362            'ds':     DSIG.BASE, 
363            'wsu':    WSU.UTILITY, 
364            'wsse':   OASIS.WSSE, 
365            'wsa':    WSA200403.ADDRESS,
366            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
367        }
368        docCtxt = XPath.Context.Context(doc, processorNss=processorNss)
369       
370        # Two stagfe process: reference validation followed by signature
371        # validation
372       
373        # 1) Reference Validation       
374        refNodes = XPath.Compile('//ds:Reference').evaluate(docCtxt)
375           
376        # Make a lambda to pick out node child or children with a given
377        # name
378        getElements = lambda node, nameList: \
379                [n for n in node.childNodes if str(n.localName) in nameList] 
380       
381        for refNode in refNodes:
382            # Get the URI for the reference
383            refURI = refNode.getAttributeNodeNS(None, 'URI').value
384                           
385            transformsNode = getElements(refNode, "Transforms")[0]
386            transforms = getElements(transformsNode, "Transform")
387   
388            algorithm = transforms[0].getAttributeNodeNS(None, 
389                                                         "Algorithm").value
390           
391            # Add extra keyword for Exclusive canonicalization method
392            kw = {}
393            if algorithm == DSIG.C14N_EXCL:
394                try:
395                    inclusiveNS = transforms[0].getElement(DSIG.C14N_EXCL, 
396                                                       "InclusiveNamespaces")
397                    kw['unsuppressedPrefixes'] = \
398                    inclusiveNS.getAttributeValue(None, "PrefixList").split()
399                except:
400                    raise Exception, \
401                'failed to handle transform (%s) in <ds:Reference URI="%s">'%\
402                        (transforms[0], uri)
403       
404            # Canonicalize the reference data and calculate the digest
405            if refURI[0] != "#":
406                raise Exception, \
407                    "Expecting # identifier for Reference URI \"%s\"" % refURI
408                   
409            # XPath reference
410            uriXPath = '//*[@wsu:Id="%s"]' % refURI[1:]
411            uriNode = XPath.Compile(uriXPath).evaluate(docCtxt)[0]
412
413            c14nRef = Canonicalize(uriNode, **kw)
414            digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
415           
416            # Extract the digest value that was stored           
417            digestNode = getElements(refNode, "DigestValue")[0]
418            nodeDigestValue = str(digestNode.childNodes[0].nodeValue).strip()   
419           
420            # Reference validates if the two digest values are the same
421            if digestValue != nodeDigestValue:
422                raise Exception, "DigestValues do not match: URI=%s" % uri
423       
424       
425        # 2) Signature Validation
426        signedInfoNode = XPath.Compile('//ds:SignedInfo').evaluate(docCtxt)[0]
427
428        # Get the canonicalization method - change later to check this and
429        # make sure it's an algorithm supported by this code
430        c14nMethodNode = getElements(signedInfoNode, 
431                                     "CanonicalizationMethod")[0]
432                                             
433        algorithm = c14nMethodNode.getAttributeNodeNS(None, 'Algorithm').value
434        if algorithm != DSIG.C14N:
435            raise Exception, \
436                "Only \"%s\" canonicalization algorithm supported" % DSIG.C14N
437               
438        # Canonicalize the SignedInfo node and take digest
439        c14nSignedInfo = Canonicalize(signedInfoNode)       
440        signedInfoDigestValue = sha(c14nSignedInfo).digest()
441       
442        # Get the signature value in order to check against the digest just
443        # calculated
444        signatureValueNode = \
445                    XPath.Compile('//ds:SignatureValue').evaluate(docCtxt)[0]
446
447        # Remove base 64 encoding
448        b64EncSignatureValue = \
449                    str(signatureValueNode.childNodes[0].nodeValue).strip()
450                   
451        signatureValue = base64.decodestring(b64EncSignatureValue)
452
453
454        # Read X.509 Cert from wsse:BinarySecurityToken node
455        # - leave out for now and read direct from hard coded pem file
456        x509Cert = X509.load_cert(certFilePath)
457       
458        # Extract RSA public key from the cert
459        rsaPubKey = x509Cert.get_pubkey().get_rsa()
460       
461        # Apply the signature verification
462        try:
463            verify = bool(rsaPubKey.verify(signedInfoDigestValue, 
464                                           signatureValue))
465        except RSA.RSAError:
466            verify = False
467           
468        return verify
469
470
471    def getTokenRefString(self):
472        """
473        Return an unique number to identify an element
474        for the element's id attribute
475        """
476        Signature.tokenRef +=1
477        return str(Signature.tokenRef)
478
479   
480     
481if __name__ == "__main__":
482    import sys
483    txt = None
484   
485    s = SignatureHandler()
486   
487    if 'sign' in sys.argv:
488        txt = s.sign('../Junk-cert.pem',
489                     '../Junk-key.pem', 
490                     open('../tmp2').read().strip())     
491        print txt
492
493    if 'verify' in sys.argv:
494        if txt is None:
495            txt = open('./wsseSign-test-res.xml').read()
496           
497        print "Signature OK? %s" % \
498                            s.verify(txt, certFilePath='../Junk-cert.pem')
Note: See TracBrowser for help on using the repository browser.