source: TI12-security/trunk/python/Tests/xmlsec/XMLSecTest.py @ 1636

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/Tests/xmlsec/XMLSecTest.py@1636
Revision 1636, 13.1 KB checked in by pjkersha, 13 years ago (diff)

Change to new package structure. All refs to NDG package changed to ndg.security

  • Property svn:executable set to *
Line 
1#!/bin/env python
2
3'''Validate M2Crypto/Crypto/DOM based encryption handler vs. pyXMLSec
4code'''
5from M2Crypto import X509, BIO, RSA
6import base64
7
8# For shared key encryption
9from Crypto.Cipher import AES, DES3
10import os
11
12from ZSI.wstools.Namespaces import DSIG, ENCRYPTION, OASIS, WSU, WSA200403, \
13                                   SOAP, SCHEMA # last included for xsi
14
15from xml.xpath.Context import Context
16from xml import xpath
17
18from xml.dom.ext.reader.PyExpat import Reader
19from ZSI.wstools.c14n import Canonicalize
20
21
22def getElements(node, nameList):
23    '''DOM Helper function for getting child elements from a given node'''
24    # Avoid sub-string matches
25    nameList = isinstance(nameList, basestring) and [nameList] or nameList
26    return [n for n in node.childNodes if str(n.localName) in nameList]
27
28
29class _ENCRYPTION(ENCRYPTION):
30    '''Derived from ENCRYPTION class to add in extra 'tripledes-cbc' - is this
31    any different to 'des-cbc'?  ENCRYPTION class implies that it is the same
32    because it's assigned to 'BLOCK_3DES' ??'''
33    BLOCK_TRIPLEDES = "http://www.w3.org/2001/04/xmlenc#tripledes-cbc"
34   
35class EncryptionError(Exception):
36    """Flags an error in the encryption process"""
37
38class DecryptionError(Exception):
39    """Raised from EncryptionHandler.decrypt if an error occurs with the
40    decryption process"""
41
42
43class EncryptionHandler(object):
44    """Encrypt/Decrypt XML"""
45   
46    # Map namespace URIs to Crypto algorithm module and mode
47    cryptoAlg = \
48    {
49         _ENCRYPTION.WRAP_AES256:      {'module':       AES, 
50                                        'mode':         AES.MODE_ECB,
51                                        'blockSize':    16},
52         
53         # CBC (Cipher Block Chaining) modes
54         _ENCRYPTION.BLOCK_AES256:     {'module':       AES, 
55                                        'mode':         AES.MODE_CBC,
56                                        'blockSize':    16},
57                                       
58         _ENCRYPTION.BLOCK_TRIPLEDES:  {'module':       DES3, 
59                                        'mode':         DES3.MODE_CBC,
60                                        'blockSize':    8}   
61    }
62   
63     
64    def __init__(self,
65                 certFilePath=None, 
66                 priKeyFilePath=None, 
67                 priKeyPwd=None,
68                 chkSecurityTokRef=False):
69       
70        self.__certFilePath = certFilePath
71        self.__priKeyFilePath = priKeyFilePath
72        self.__priKeyPwd = priKeyPwd
73       
74        self.__chkSecurityTokRef = chkSecurityTokRef
75
76
77    def encrypt(self, xmlTxt):
78        """Encrypt xml text
79       
80        Use Key Wrapping - message is encrypted using a shared key which
81        itself is encrypted with the public key provided by the X.509 cert.
82        certFilePath"""
83       
84        # Use X.509 Cert to encrypt
85        x509Cert = X509.load_cert(self.__certFilePath)
86
87        docNode = Reader().fromString(xmlTxt)
88
89     
90        encrDataElem = docNode.createElement('EncryptedData')
91        encrDataElem.setAttribute('xmlns', _ENCRYPTION.BASE)
92        encrDataElem.setAttribute('Type', _ENCRYPTION.BASE + 'Element')
93       
94        # Encryption method used to encrypt the target data
95        dataEncrMethodElem = docNode.createElement('EncryptionMethod')       
96        dataEncrMethodElem.setAttribute('Algorithm', 
97                                        _ENCRYPTION.BLOCK_TRIPLEDES)
98        encrDataElem.appendChild(dataEncrMethodElem)
99       
100       
101        # Key Info
102        keyInfoElem = docNode.createElement('KeyInfo')
103        keyInfoElem.setAttribute('xmlns', DSIG.BASE)
104        encrDataElem.appendChild(keyInfoElem)
105       
106        encrKeyElem = docNode.createElement('EncryptedKey')
107        encrKeyElem.setAttribute('xmlns', _ENCRYPTION.BASE)
108        keyInfoElem.appendChild(encrKeyElem)
109       
110        # Encryption method used to encrypt the shared key
111        keyEncrMethodElem = docNode.createElement('EncryptionMethod')
112        keyEncrMethodElem.setAttribute('Algorithm', _ENCRYPTION.KT_RSA_1_5)
113        encrKeyElem.appendChild(keyEncrMethodElem)
114
115
116        # Key Info
117        encrKeyInfoElem = docNode.createElement('KeyInfo')
118        encrKeyInfoElem.setAttribute('xmlns', DSIG.BASE)
119        encrKeyElem.appendChild(encrKeyInfoElem)
120       
121        keyNameElem = docNode.createElement('KeyName')
122        encrKeyInfoElem.appendChild(keyNameElem)
123        keyNameElem.appendChild(docNode.createTextNode(self.__priKeyFilePath))
124       
125       
126        # References to what has been encrypted
127        encrKeyCiphDataElem = docNode.createElement('CipherData')
128        encrKeyElem.appendChild(encrKeyCiphDataElem)
129       
130        encrKeyCiphValElem = docNode.createElement('CipherValue')
131       
132        encrKeyCiphDataElem.appendChild(encrKeyCiphValElem)
133                     
134        # Cipher data
135        ciphDataElem = docNode.createElement('CipherData')
136        encrDataElem.appendChild(ciphDataElem)
137       
138        ciphValueElem = docNode.createElement('CipherValue')
139        ciphDataElem.appendChild(ciphValueElem)
140
141        import pdb;pdb.set_trace()
142
143        # Get elements for encryption
144        dataElem = docNode.childNodes[2]
145        data = Canonicalize(dataElem)
146     
147        # DES3 algorithm requires data length to be a multiple of 8 - pad as
148        # required   
149        modData = len(data) % 8
150        nPad = modData and 8 - modData or 0
151        data += ' '*(nPad-1)       
152             
153        # Last byte should be number of padding bytes
154        # (http://www.w3.org/TR/xmlenc-core/#sec-Alg-Block)
155        data += chr(nPad)
156       
157        # Generate shared key and input vector - for testing use hard-coded
158        # values to allow later comparison
159        #sharedKey = os.urandom(24)
160        #iv = os.urandom(8)
161        sharedKey = '01234567890123456789ABCD'
162        iv = 'ABCDEFGH'
163       
164        # Encrypt required elements - Prepend input vector to data
165        alg = DES3.new(sharedKey, DES3.MODE_CBC, iv)
166        encryptedData = alg.encrypt(iv + data)
167       
168        dataCiphValue = base64.encodestring(encryptedData).strip()
169
170        ciphValueElem.appendChild(docNode.createTextNode(dataCiphValue))
171       
172       
173        # ! Delete unencrypted message body elements !
174        docNode.removeChild(dataElem)
175
176       
177        # Use X.509 cert public key to encrypt the shared key - Extract key
178        # from the cert
179        rsaPubKey = x509Cert.get_pubkey().get_rsa()
180       
181        # Encrypt the shared key
182        encryptedSharedKey = rsaPubKey.public_encrypt(sharedKey, 
183                                                      RSA.pkcs1_padding)
184       
185        encrKeyCiphVal = base64.encodestring(encryptedSharedKey).strip()
186       
187        # Add the encrypted shared key to the EncryptedKey section in the SOAP
188        # header
189        encrKeyCiphValElem.appendChild(docNode.createTextNode(encrKeyCiphVal))
190
191        return Canonicalize(encrDataElem)
192       
193       
194    def decrypt(self, encryptedXML):
195        """Decrypt XML"""
196       
197        docNode = Reader().fromString(encryptedXML)
198        processorNss = \
199        {
200            'xenc':   ENCRYPTION.BASE,
201            'ds':     DSIG.BASE, 
202            'wsu':    WSU.UTILITY, 
203            'wsse':   OASIS.WSSE, 
204            'soapenv':"http://schemas.xmlsoap.org/soap/envelope/" 
205        }
206        ctxt = Context(docNode, processorNss=processorNss)
207       
208
209        # Get EncryptedData node
210        encrNode = xpath.Evaluate('//EncryptedData', 
211                                  contextNode=docNode, 
212                                  context=ctxt)[0]
213                                 
214                                 
215        # Check for wrapped key encryption
216        encrKeyNodes = xpath.Evaluate('//KeyInfo/EncryptedKey', 
217                                      contextNode=docNode, 
218                                      context=ctxt)
219        if len(encrKeyNodes) > 1:
220            raise DecryptionError, 'This implementation can only handle ' + \
221                                   'single EncryptedKey element'
222       
223        try:
224            encrKeyNode = encrKeyNodes[0]
225        except:
226            # Shared key encryption used - leave out for the moment
227            raise DecryptionError, 'This implementation can only handle ' + \
228                                   'wrapped key encryption'
229
230       
231        # Check wrapped key encryption method
232        keyEncrMethodNode = getElements(encrKeyNode, 'EncryptionMethod')[0]     
233        keyAlgorithm = keyEncrMethodNode.getAttributeNodeNS(None, 
234                                                            "Algorithm").value
235#        if keyAlgorithm != ENCRYPTION.KT_RSA_1_5:
236#            raise DecryptionError, \
237#            'Encryption algorithm for wrapped key is "%s", expecting "%s"' % \
238#                (keyAlgorithm, ENCRYPTION.KT_RSA_1_5)
239
240                                                           
241        if self.__chkSecurityTokRef and self.__certFilePath:
242             
243            # Check input cert. against SecurityTokenReference
244            securityTokRefXPath = '//KeyInfo/X509Data'
245            securityTokRefNode = xpath.Evaluate(securityTokRefXPath, 
246                                                contextNode=encrKeyNode, 
247                                                context=ctxt)
248            # Look for ds:X509* elements to check against X.509 cert input
249
250
251        # Look for cipher data for wrapped key
252        keyCiphDataNode = getElements(encrKeyNode, 'CipherData')[0]
253        keyCiphValNode = getElements(keyCiphDataNode, 'CipherValue')[0]
254
255        keyCiphVal = str(keyCiphValNode.childNodes[0].nodeValue)
256        encryptedKey = base64.decodestring(keyCiphVal)
257
258        # Read RSA Private key in order to decrypt wrapped key 
259        priKeyFile = BIO.File(open(self.__priKeyFilePath))                                           
260        priKey = RSA.load_key_bio(priKeyFile, 
261                                  callback=lambda *ar, **kw: self.__priKeyPwd)
262       
263        import pdb;pdb.set_trace()
264        sharedKey = priKey.private_decrypt(encryptedKey, RSA.pkcs1_padding)
265
266#        x509Cert = X509.load_cert(self.__certFilePath)
267#        rsaPubKey = x509Cert.get_pubkey().get_rsa()
268#        rsaPubKey.public_decrypt(encryptedKey, RSA.pkcs1_padding)       
269       
270        # Check encryption method
271        dataEncrMethodNode = getElements(encrNode, 'EncryptionMethod')[0]     
272        dataAlgorithm = dataEncrMethodNode.getAttributeNodeNS(None, 
273                                                        "Algorithm").value
274        try:       
275            # Match algorithm name to Crypto module
276            CryptoAlg = self.cryptoAlg[dataAlgorithm]
277           
278        except KeyError:
279            raise DecryptionError, \
280'Encryption algorithm for data is "%s", supported algorithms are:\n "%s"' % \
281                (keyAlgorithm, "\n".join(self.cryptoAlg.keys()))
282           
283        # Get Data
284        dataCiphDataNode = getElements(encrNode, 'CipherData')[0]
285        dataCiphValNode = getElements(dataCiphDataNode, 'CipherValue')[0]
286   
287        dataCiphVal = dataCiphValNode.childNodes[0].nodeValue
288        encryptedData = base64.decodestring(dataCiphVal)
289       
290        alg = CryptoAlg['module'].new(sharedKey, CryptoAlg['mode'])
291        decryptedData = alg.decrypt(encryptedData)
292       
293        # Strip prefix - assume is block size
294        decryptedData = decryptedData[CryptoAlg['blockSize']:]
295       
296        # Strip any padding
297        lastChar = decryptedData[-1]
298        nPad = ord(lastChar)
299       
300        # Sanity check - there may be no padding at all - the last byte being
301        # the end of the encrypted XML?
302        #
303        # TODO: are there better sanity checks than this?!
304        if nPad < CryptoAlg['blockSize'] and nPad > 0 and \
305           lastChar != '\n' and lastChar != '>':
306           
307            # Follow http://www.w3.org/TR/xmlenc-core/#sec-Alg-Block -
308            # last byte gives number of padding bytes
309            decryptedData = decryptedData[:-nPad]
310           
311        # Parse the encrypted data
312        rdr = Reader()
313        dataNode = rdr.fromString(decryptedData, ownerDoc=docNode)
314       
315        # Add decrypted element to parent and remove encrypted one
316        parentNode = encrNode._get_parentNode()
317        parentNode.appendChild(dataNode)
318        parentNode.removeChild(encrNode)
319       
320        from xml.dom.ext import ReleaseNode
321        ReleaseNode(encrNode)
322       
323        # Ensure body_root attribute is up to date in case it was
324        # previously encrypted
325        print decryptedData
326        import pdb;pdb.set_trace()       
327
328        return docNode
329
330if __name__ == "__main__":
331    txt = None
332   
333    e = EncryptionHandler(certFilePath='../Junk-cert.pem',
334                          priKeyFilePath='../Junk-key.pem',
335                          priKeyPwd=open('../tmp2').read().strip())
336   
337    #encryptedData = e.encrypt(None)
338    from ndg.security.XMLSecDoc import *
339#    xmlSecDoc = XMLSecDoc(encrPubKeyFilePath='../Junk-cert.pem',
340#                          encrPriKeyFilePath='../Junk-key.pem')
341#    txt = xmlSecDoc.encrypt(filePath='encrypt4-doc.xml', rtnAsString=True)
342
343    txt = open('encrypt4-doc.xml').read()
344    encrTxt = e.encrypt(txt)
345    open('xmlsecTestEncr.xml', 'w').write(encrTxt)
346#    encrTxt = open('encrypt4-res-3DES.xml').read()
347#    print e.decrypt(encrTxt)
Note: See TracBrowser for help on using the repository browser.