source: TI12-security/trunk/python/Tests/xmlsec/wsSecurity.py @ 1425

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/wsSecurity.py@1425
Revision 1425, 17.5 KB checked in by pjkersha, 13 years ago (diff)

Working sign and verify. Namespace declarations must be the same in the resulting canonicalization otherwise it
won't work. Previously, only the SignedInfo? chunk was read into a dom object from a string. This meant that
other namespaces for the whole document were left out. When verify was called the resulting digest didn't
agree becuase with verify, all the document was read into a dom object and so all namespace declaration had been
included.

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3import re
4
5# Digest and signature/verify
6from sha import sha
7from M2Crypto import X509, BIO, RSA
8import base64
9
10import cElementTree as ElementTree
11import ZSI
12from ZSI.wstools.Namespaces import DSIG, OASIS, GLOBUS, WSA200403, BEA, SOAP
13
14# XML Parsing
15from cStringIO import StringIO
16from Ft.Xml.Domlette import NonvalidatingReaderBase, NonvalidatingReader
17from Ft.Xml import XPath
18
19# Canonicalization
20from ZSI.wstools.c14n import Canonicalize
21from xml.dom import Node
22
23# Type codes for signature elements
24#from ZSI.TC import _get_global_element_declaration as GED
25
26_attrs = lambda E: E._get_attributes() or []
27_children = lambda E: E._get_childNodes() or []
28_sorter = lambda n1, n2: cmp(n1._get_nodeName(), n2._get_nodeName())
29
30
31#class BST(str):
32#    typecode = GED(OASIS.WSSE, "BinarySecurityToken")
33
34#KeyInfo = GED(DSIG.BASE, "KeyInfo").pyclass
35#Reference = GED(OASIS.WSSE, "Reference").pyclass
36#SecurityReference = GED(OASIS.WSSE, "SecurityTokenReference").pyclass
37#Signature = GED(DSIG.BASE, "Signature").pyclass
38
39
40class SignatureHandler(object):
41   
42    # Unique token reference
43    tokenRef = 10101
44   
45    def __init__(self, sign_body=True, sign_headers=True):
46       
47        self._can_algo = DSIG.C14N
48        self._sig_algo = GLOBUS.SIG
49        self._dig_algo = DSIG.DIGEST_SHA1
50        self._contextID = None
51        self._expire_time = 300
52        self._setProtocol = False
53        self._sign_headers = sign_headers
54        self._sign_body = sign_body
55
56   
57    def sign2(self, certFilePath, priKeyFilePath, priKeyPwd=None):
58        """Sign an XML doc       
59
60        From pyGridWare GssSignatureHandler
61
62        General algorithm
63        - Create the authentication information
64        elements (ie BinarySecurityToken)
65        and security elements(ie timestamp)
66       
67        - Create digests for anything in the
68        body of the Message
69
70        - Fill in the SignedInfoNode, create elements
71        for digests of elements in the header
72        leave the content of the element empty. Create
73        a SignatureValue element also leavethe content
74        of that element empty.
75
76        - serialize the header
77
78        - Calculate the digests for elements in the
79        header(we had to wait until the header was
80        serialized to do this) Put the digests as
81        the content for those empty digestValue elements
82        that we had created
83
84        - Calculate the SignatureValue and place it in
85        the content of the SignatureValue element
86       
87        """
88
89        context_id = self._contextID
90
91        if self._sign_headers: 
92            pass#self.signHeaderElements(sw)
93           
94        if self._sign_body: 
95            pass#self.signBodyElement(sw)
96       
97        pubKey = X509.load_cert(certFilePath)
98       
99        pubKeyPat = re.compile(\
100            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
101            re.S)
102        x509Cert = pubKeyPat.findall(pubKey.as_pem())[0]
103
104        import pdb;pdb.set_trace()
105        binarySecurityToken = BST(x509Cert)
106        #move this GssSecureMessage
107        tokenRefString = self.getTokenRefString()
108        binarySecurityToken._attrs = \
109        {
110            (OASIS.UTILITY,'Id'):  "CertId-" + tokenRefString,
111            'ValueType':           "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509PKIPathv1",
112            'EncodingType':        "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary"
113        }
114        attrTypeCodeDict = \
115        {
116            (OASIS.UTILITY, 'Id'):  ZSI.TC.String(),
117            'ValueType':            ZSI.TC.String(),
118            'EncodingType':         ZSI.TC.String()
119        }
120        binarySecurityToken.typecode.attribute_typecode_dict.update(attrTypeCodeDict)
121
122         
123        typecode = GED(NS1, "Security")
124        pyobj = typecode.pyclass()
125        pyobj._any = []
126        pyobj._any.append(binarySecurityToken)   
127             
128                                 
129        sig = Signature()
130        pyobj._any.append(sig)
131        sig._SignedInfo = Holder()
132        sig._SignedInfo._CanonicalizationMethod = Holder()
133        sig._SignedInfo._CanonicalizationMethod._attrs = {'Algorithm':self._can_algo}
134        sig._SignedInfo._SignatureMethod = Holder()
135        sig._SignedInfo._SignatureMethod._attrs = {'Algorithm':self._sig_algo}
136        sig._SignedInfo._Reference = []
137        sig._SignatureValue = ''
138       
139        sig._KeyInfo = KeyInfo()
140        secRef = SecurityReference()
141        secRef._any = Reference()
142        secRef._any._attrs = {}
143        secRef._any._attrs['URI'] = "#CertId-" + tokenRefString
144        sig._KeyInfo._any = secRef
145
146        '''
147        processorNss={'wsu':OASIS.UTILITY, 'ds':DSIG.BASE}
148        sw.dom.setContext(processorNss=processorNss)
149        nodes = sw.dom.evaluate(expression='//*[@wsu:Id]')
150
151        #Calculate the digests in the body
152        for node in nodes:
153            URI = '#%s' %node.getAttributeValue(OASIS.UTILITY, 'Id')
154            ref = Holder()
155            ref._attrs = {'URI': URI}
156            ref._DigestMethod = Holder()
157            ref._DigestMethod._attrs = {'Algorithm':self._dig_algo}
158            ref._DigestValue = self.getDigestValueFromSoapWriter(sw, URI)
159            sig._SignedInfo._Reference.append(ref)
160
161        #These haven't been serialized yet, the id's to take
162        #digests of in the security header
163        idUriList = []
164        for idUri in idUriList:
165            ref = Holder()
166            ref._attrs = {'URI': idUri}
167            ref._DigestMethod = Holder()
168            ref._DigestMethod._attrs = {'Algorithm':self._dig_algo}
169            ref._DigestValue = ''
170            sig._SignedInfo._Reference.append(ref)
171
172        sw.serialize_header(pyobj, typecode)
173
174        # Reset Context after append on DOM
175        sw.dom.setContext(processorNss=processorNss)
176        for URI in idUriList:
177            digestValue = self.getDigestValueFromSoapWriter(sw, URI)
178            referenceNodes = sw.dom.evaluate(expression='//ds:Reference')
179           
180            for referenceNode in referenceNodes:
181                refUri = referenceNode.getAttributeValue(None, "URI")
182                if refUri == URI:
183                    digestValueNode = referenceNode.getElement(DSIG.BASE, "DigestValue")
184                    digestValueNode.createAppendTextNode(base64.encodestring(digestValue))
185           
186        # Reset Context after append on DOM
187        sw.dom.setContext(processorNss=processorNss)
188        nodes = sw.dom.evaluate(expression='//ds:SignedInfo')
189        signedInfo = nodes[0]
190        signedInfoStr = signedInfo.canonicalize()
191        nodes = sw.dom.evaluate(expression='//ds:SignatureValue')
192        signatureValueNode = nodes[0]
193        if useSecureMessage:
194            hashedSignedInfoStr = sha.sha(signedInfoStr).digest()
195            signatureValue = base64.encodestring(auth.sign(hashedSignedInfoStr))
196        else:
197            signatureValue = base64.encodestring(secContext.getMIC(signedInfoStr))
198        signatureValueNode.createAppendTextNode(signatureValue)
199'''
200
201    def sign(self, certFilePath, priKeyFilePath, priKeyPwd=None):
202
203        msgTmpl = """<?xml version="1.0" encoding="UTF-8"?>
204<!--
205SOAP Message with WSSE Signature
206-->
207<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
208    <soapenv:Header>
209        <wsse:Security xmlns:wsse="http://schemas.xmlsoap.org/ws/2002/04/secext" xmlns:wsu="http://schemas.xmlsoap.org/ws/2002/04/utility" soapenv:mustUnderstand="1">
210            <ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
211                <ds:SignedInfo>
212                    <ds:CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
213                    <ds:SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
214                    <ds:Reference URI="./wsseSign-doc.xml">
215                        <ds:Transforms>
216                            <ds:Transform Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
217                        </ds:Transforms>
218                        <ds:DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
219                        <ds:DigestValue>%s</ds:DigestValue>
220                    </ds:Reference>
221                </ds:SignedInfo>
222                <ds:SignatureValue>%s</ds:SignatureValue>
223                <ds:KeyInfo>
224                    <ds:X509Data>
225                        <ds:X509Certificate>
226%s
227                        </ds:X509Certificate>
228                    </ds:X509Data>
229                </ds:KeyInfo>
230            </ds:Signature>
231        </wsse:Security>
232    </soapenv:Header>
233    <soapenv:Body xmlns:wsu="http://schemas.xmlsoap.org/ws/2002/04/utility" wsu:Id="msgBody">
234    Hello, World!
235    </soapenv:Body>
236</soapenv:Envelope>"""
237
238
239        # 1) Reference Generation
240        #
241        # Reference - hard code as local file for now
242        refURIstream = open("./wsseSign-doc.xml")
243
244        # Canonicalize reference
245        refDom = NonvalidatingReader.parseStream(refURIstream)
246        c14nRef = Canonicalize(refDom)
247       
248        # Calculate digest for reference and base 64 encode
249        #
250        # Nb. encodestring adds a trailing newline char
251        digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
252       
253       
254        # 2) Signature Generation
255        #
256        # Add digest into SignedInfo
257        msgTxt = msgTmpl % (digestValue, '%s', '%s')
258
259        import pdb;pdb.set_trace()
260
261        # Test against signature generated by pyXMLSec version
262        #xmlTxt = open('./wsseSign-xmlsec-res.xml').read()
263        #dom = NonvalidatingReader.parseStream(StringIO(xmlTxt))
264       
265        # Parse SignedInfo into DOM object ready for canonicalization function
266        dom = NonvalidatingReader.parseStream(StringIO(msgTxt))
267       
268        # Namespaces
269        processorNss = \
270        {
271            'ds':     DSIG.BASE, 
272            'wsu':    OASIS.UTILITY, 
273            'wsse':   OASIS.WSSE, 
274            'wsa':    WSA200403.ADDRESS,
275            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/", 
276            'wsc':    BEA.SECCONV,
277            'ns1':    "http://counter.com",
278        }
279        context = XPath.Context.Context(dom, processorNss=processorNss)
280         
281        # Get the SignedInfo node and canonicalize
282        #
283        # Nb. When extracted the code adds the namespace attrinute to the
284        # signedInfo!  This has important consequences for validation -
285        #
286        # 1) Do you strip the namespace attribute before taking the digest to
287        # ensure the text is exactly the same as what is displayed in the
288        # message?
289        #
290        # 2) Leave it in and assume the validation algorithm will expect to
291        # add in the namespace attribute?!
292        #
293        # http://www.w3.org/TR/xml-c14n#NoNSPrefixRewriting implies you need
294        # to include namespace declarations for namespaces referenced in a doc
295        # subset - yes to 2)
296        signedInfoNode = XPath.Compile('//ds:SignedInfo').evaluate(context)[0]
297        #signedInfoNode = dom.childNodes[0]
298        c14nSignedInfo = Canonicalize(signedInfoNode)
299
300        # Calculate digest of SignedInfo
301        signedInfoDigestValue = sha(c14nSignedInfo).digest().strip()
302       
303        # Read Private key to sign with   
304        priKeyFile = BIO.File(open(priKeyFilePath))                                           
305        priKey = RSA.load_key_bio(priKeyFile, 
306                                  callback=lambda *args, **kwargs: priKeyPwd)
307       
308        # Sign using the private key and base 64 encode the result
309        signatureValue = priKey.sign(signedInfoDigestValue)
310        b64EncSignatureValue = base64.encodestring(signatureValue).strip()
311
312        # Get X.509 cert for inclusion into KeyInfo
313        x509Cert = X509.load_cert(certFilePath)
314       
315        x509CertPat = re.compile(\
316            '-----BEGIN CERTIFICATE-----\n?(.*?)\n?-----END CERTIFICATE-----',
317            re.S)
318        x509CertStr = x509CertPat.findall(x509Cert.as_pem())[0]
319       
320        signedMsgTxt = msgTxt % (b64EncSignatureValue, x509CertStr)
321       
322        # Extract RSA public key from the cert
323        rsaPubKey = x509Cert.get_pubkey().get_rsa()
324       
325        # Check the signature
326        verify = bool(rsaPubKey.verify(signedInfoDigestValue, signatureValue))
327       
328        return signedMsgTxt
329
330
331    def verify(self, xmlTxt, certFilePath=None):
332        """Verify signature"""
333        import pdb;pdb.set_trace()
334       
335        # Parse file
336        node = NonvalidatingReader.parseStream(StringIO(xmlTxt))
337       
338        # Set namespaces for XPath queries
339        processorNss = \
340        {
341            'ds':     DSIG.BASE, 
342            'wsu':    OASIS.UTILITY, 
343            'wsse':   OASIS.WSSE, 
344            'wsa':    WSA200403.ADDRESS,
345            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/", 
346            'wsc':    BEA.SECCONV,
347            'ns1':    "http://counter.com",
348        }
349        context = XPath.Context.Context(node, processorNss=processorNss)
350       
351        # Two stagfe process: reference validation followed by signature
352        # validation
353       
354        # 1) Reference Validation       
355        refNode = XPath.Compile('//ds:Reference').evaluate(context)[0]
356       
357        # Get the URI for the signed info
358        refURI = refNode.getAttributeNodeNS(None, 'URI').value
359       
360       
361        # Make a lambda to pick out node child or children with a given name
362        getElements = lambda node, nameList: \
363            [n for n in node.childNodes if str(n.localName) in nameList] 
364           
365        transformsNode = getElements(refNode, "Transforms")[0]
366        transforms = getElements(transformsNode, "Transform")
367
368        algorithm = transforms[0].getAttributeNodeNS(None, "Algorithm").value
369       
370        # Add extra keyword for Exclusive canonicalization method
371        kw = {}
372        if algorithm == DSIG.C14N_EXCL:
373            try:
374                inclusiveNS = transforms[0].getElement(DSIG.C14N_EXCL, 
375                                                   "InclusiveNamespaces")
376                kw['unsuppressedPrefixes'] = \
377                inclusiveNS.getAttributeValue(None, "PrefixList").split()
378            except:
379                raise Exception, \
380            'failed to handle transform (%s) in <ds:Reference URI="%s">'%\
381                    (transforms[0], uri)
382         
383        # For testing, URI is a local file - change later to a ref within the
384        # document   
385        uriNode = NonvalidatingReader.parseStream(open(refURI))
386   
387        # Canonicalize the reference data and calculate the digest
388        c14nRef = Canonicalize(uriNode, **kw)
389        digestValue = base64.encodestring(sha(c14nRef).digest()).strip()
390       
391        # Extract the digest value that was stored           
392        digestNode = getElements(refNode, "DigestValue")[0]
393        nodeDigestValue = str(digestNode.childNodes[0].nodeValue).strip()   
394       
395        # Reference validates if the two digest values are the same
396        if digestValue != nodeDigestValue:
397            raise Exception, "DigestValues do not match: URI=%s" % uri
398       
399       
400        # 2) Signature Validation
401        signedInfoNode = XPath.Compile('//ds:SignedInfo').evaluate(context)[0]
402
403        # Get the canonicalization method - change later to check this and
404        # make sure it's an algorithm supported by this code
405        c14nMethodNode = getElements(signedInfoNode, 
406                                     "CanonicalizationMethod")[0]
407                                             
408        algorithm = c14nMethodNode.getAttributeNodeNS(None, 'Algorithm').value
409
410        # Canonicalize the SignedInfo node and take digest
411        c14nSignedInfo = Canonicalize(signedInfoNode)       
412        signedInfoDigestValue = sha(c14nSignedInfo).digest()
413       
414        # Get the signature value in order to check against the digest just
415        # calculated
416        signatureValueNode = \
417                    XPath.Compile('//ds:SignatureValue').evaluate(context)[0]
418
419        # Remove base 64 encoding
420        b64EncSignatureValue = \
421                    str(signatureValueNode.childNodes[0].nodeValue).strip()
422                   
423        signatureValue = base64.decodestring(b64EncSignatureValue)
424
425
426        # Read X.509 Cert from wsse:BinarySecurityToken node
427        # - leave out for now and read direct from hard coded pem file
428        x509Cert = X509.load_cert(certFilePath)
429       
430        # Extract RSA public key from the cert
431        rsaPubKey = x509Cert.get_pubkey().get_rsa()
432       
433        # Apply the signature verification
434        try:
435            verify = bool(rsaPubKey.verify(signedInfoDigestValue, 
436                                           signatureValue))
437        except RSA.RSAError:
438            verify = False
439           
440        return verify
441
442
443    def getTokenRefString(self):
444        """
445        Return an unique number to identify an element
446        for the element's id attribute
447        """
448        Signature.tokenRef +=1
449        return str(Signature.tokenRef)
450
451   
452     
453if __name__ == "__main__":
454    import sys
455    txt = None
456   
457    s = SignatureHandler()
458   
459    if 'sign' in sys.argv:
460        txt = s.sign('../Junk-cert.pem',
461                     '../Junk-key.pem', 
462                     open('../tmp2').read().strip())     
463        print txt
464
465    if 'verify' in sys.argv:
466        if txt is None:
467            txt = open('./wsseSign-test-res.xml').read()
468           
469        print "Signature OK? %s" % \
470                            s.verify(txt, certFilePath='../Junk-cert.pem')
Note: See TracBrowser for help on using the repository browser.