source: TI12-security/trunk/python/ndg.security.common/ndg/security/common/X509.py @ 2270

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.common/ndg/security/common/X509.py@2270
Revision 2270, 20.2 KB checked in by pjkersha, 13 years ago (diff)

Replaced

reposID = '$Id$'

with,

revision = '$Id$'

for all relevant files.

ndg.security.server/setup.py:

  • added license keyword to setup

ndg.security.server/ndg/security/server/AttAuthority/init.py:

  • removed refs to proxy certificate in getattCert call. Input cert may not necessarily be a proxy.

ndg.security.common/setup.py: Major fixes to give *working* version.

  • PyXML dependency to ZSI fixed by giving explict sourceforge dependency link
  • Get ZSI from PyPI insteads of Sourceforge
  • Moved SQLObject and MySQL dependency to a separate if clause. This will be completed later to

allow inclusion of these on provision of a given option

  • added license keyword to setup.
  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1"""X.509 certificate handling class encapsulates M2Crypto.X509
2
3Nerc Data Grid Project
4
5P J Kershaw 05/04/05
6
7Copyright (C) 2006 CCLRC & NERC
8
9This software may be distributed under the terms of the Q Public License,
10version 1.0 or later."""
11
12__revision__ = '$Id$'
13
14
15import types
16import re
17
18# Handle not before and not after strings
19from time import strftime
20from time import strptime
21from datetime import datetime
22
23import M2Crypto
24
25
26class X509CertError(Exception):
27    """Exception handling for NDG X.509 Certificate handling class."""
28
29
30class X509Cert(object):
31    "NDG X509 Certificate Handling"
32
33    def __init__(self, filePath=None, m2CryptoX509=None):
34
35        # Set certificate file path
36        if filePath is not None:
37            if not isinstance(filePath, basestring):
38                raise X509CertError, \
39                        "Certificate File Path input must be a valid string"
40           
41        self.__filePath = filePath           
42        self.__dn = None
43        self.__dtNotBefore = None
44        self.__dtNotAfter = None
45       
46        if m2CryptoX509:
47            self.__setM2CryptoX509(m2CryptoX509)
48               
49
50    def __str__(self):
51        """Override to display current certificate file setting."""
52
53        if self.__filePath is None:
54            return '<X509 Cert>'
55        else:
56            return '<X509 Cert \'%s\'>' % self.__filePath
57
58
59    def __repr__(self):
60        """Override to display current certificate file setting.""" 
61        return str(self)
62       
63
64    def read(self, filePath=None):
65        """Read a certificate from file"""
66       
67        # Check for optional input certificate file path
68        if filePath is not None:
69            if not isinstance(filePath, basestring):
70                raise X509CertError, \
71                    "Certificate File Path input must be a valid string"
72           
73            self.__filePath = filePath
74       
75        try:
76            self.__m2CryptoX509 = M2Crypto.X509.load_cert(self.__filePath)
77        except Exception, e:
78            raise X509CertError, "Error loading certificate \"%s\": %s" % \
79                                (self.__filePath, str(e))
80
81        # Update DN and validity times from M2Crypto X509 object just
82        # created
83        self.__setM2CryptoX509()
84
85
86
87       
88    def parse(self, certTxt):
89        """Read a certificate input as a string"""
90
91        try:
92            # Create M2Crypto memory buffer and pass to load certificate
93            # method
94            #
95            # Nb. input converted to standard string - buffer method won't
96            # accept unicode type strings
97            certBIO = M2Crypto.BIO.MemoryBuffer(str(certTxt))
98            self.__m2CryptoX509 = M2Crypto.X509.load_cert_bio(certBIO)
99           
100        except Exception, e:
101            raise X509CertError, "Error loading certificate: %s" % str(e)
102
103        # Update DN and validity times from M2Crypto X509 object just
104        # created
105        self.__setM2CryptoX509()
106
107
108       
109       
110    def __setM2CryptoX509(self, m2CryptoX509=None):
111        """Private method allows class members to be updated from the
112        current M2Crypto object.  __m2CryptoX509 must have been set."""
113       
114        if m2CryptoX509 is not None:
115            if not isinstance(m2CryptoX509, M2Crypto.X509.X509):
116                raise TypeError, \
117                    "Incorrect type for input M2Crypto.X509.X509 object"
118                   
119            self.__m2CryptoX509 = m2CryptoX509
120           
121           
122        # Get distinguished name
123        m2CryptoX509Name = self.__m2CryptoX509.get_subject()
124
125        # Instantiate X500 Distinguished name
126        self.__dn = X500DN(m2CryptoX509Name=m2CryptoX509Name)
127
128
129        # Get not before and not after validity times
130        #
131        # Only option for M2Crypto seems to be to return the times as
132        # formatted strings and then parse them in order to create a datetime
133        # type
134       
135        try:
136            m2CryptoNotBefore = self.__m2CryptoX509.get_not_before()
137            self.__dtNotBefore=self.__m2CryptoUTC2datetime(m2CryptoNotBefore)
138                                       
139        except Exception, e:
140            raise X509CertError, "Not Before time: " + str(e)
141
142       
143        try:
144            m2CryptoNotAfter = self.__m2CryptoX509.get_not_after()
145            self.__dtNotAfter = self.__m2CryptoUTC2datetime(m2CryptoNotAfter)
146                                   
147        except Exception, e:
148            raise X509CertError, "Not After time: " + str(e)
149
150
151    #_________________________________________________________________________
152    def __getM2CryptoX509(self, m2CryptoX509=None):
153        "Return M2Crypto X.509 cert object"
154        return self.__m2CryptoX509
155   
156   
157    m2CryptoX509 = property(fset=__setM2CryptoX509,
158                            fget=__getM2CryptoX509,
159                            doc="M2Crypto.X509.X509 type")
160
161       
162    def toString(self, filePath=None):
163        """Return certificate file content as a string"""
164       
165        # Check M2Crypto.X509 object has been instantiated - if not call
166        # read method
167        if self.__m2CryptoX509 is None:
168            self.read(filePath)
169           
170        return self.__m2CryptoX509.as_pem()
171
172   
173    #_________________________________________________________________________
174    # Make some attributes accessible as read-only
175    def __getDN(self):
176        """Get X500 Distinguished Name."""
177        return self.__dn
178
179    dn = property(fget=__getDN, doc="X.509 Distinguished Name")
180
181
182    def __getVersion(self):
183        """Get X.509 Certificate version"""
184        if self.__m2CryptoX509 is None:
185            return None
186       
187        return self.__m2CryptoX509.get_version()
188
189    version = property(fget=__getVersion, doc="X.509 Certificate version")
190       
191       
192    def __getSerialNumber(self):
193        """Get Serial Number"""
194        if self.__m2CryptoX509 is None:
195            return None
196       
197        return self.__m2CryptoX509.get_serial_number()
198   
199    serialNumber = property(fget=__getSerialNumber, 
200                            doc="X.509 Certificate Serial Number")
201       
202
203    def __getNotBefore(self):
204        """Get not before validity time as datetime type"""
205        if self.__m2CryptoX509 is None:
206            return None
207       
208        return self.__dtNotBefore
209
210    notBefore = property(fget=__getNotBefore, 
211                         doc="Not before validity time as datetime type")
212       
213       
214    def __getNotAfter(self):
215        """Get not after validity time as datetime type"""
216        if self.__m2CryptoX509 is None:
217            return None
218       
219        return self.__dtNotAfter
220
221    notAfter = property(fget=__getNotAfter, 
222                         doc="Not after validity time as datetime type")
223       
224       
225    def __getPubKey(self):
226        """Get public key"""
227        if self.__m2CryptoX509 is None:
228            return None
229       
230        return self.__m2CryptoX509.get_pubkey()
231
232    pubKey = property(fget=__getPubKey,  doc="Public Key")
233       
234       
235    def __getIssuer(self):
236        """Get Certificate issuer"""
237        if self.__m2CryptoX509 is None:
238            return None
239       
240        # Return as X500DN type
241        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_issuer())
242
243    issuer = property(fget=__getIssuer,  doc="Certificate Issuer")
244       
245   
246    def __getSubject(self):
247        """Get Certificate subject"""
248        if self.__m2CryptoX509 is None:
249            return None
250
251        # Return as X500DN type
252        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_subject())
253   
254    subject = property(fget=__getSubject,  doc="Certificate subject")
255
256
257    def isValidTime(self, raiseExcep=False):
258        """Check Certificate for expiry
259
260        raiseExcep: set True to raise an exception if certificate is invalid"""
261
262        if not isinstance(self.__dtNotBefore, datetime):
263            raise X509CertError("Not Before datetime is not set")
264
265        if not isinstance(self.__dtNotAfter, datetime):
266            raise X509CertError("Not After datetime is not set")
267       
268        dtNow = datetime.utcnow()
269
270        if raiseExcep:
271            if dtNow < self.__dtNotBefore:
272                raise X509CertError("Current time is before the " + \
273                                    "certificate's Not Before Time")
274           
275            elif dtNow > self.__dtNotAfter:
276                raise X509CertError("Certificate has expired")
277        else:
278            return dtNow > self.__dtNotBefore and dtNow < self.__dtNotAfter
279
280
281
282
283    def __m2CryptoUTC2datetime(self, m2CryptoUTC):
284        """Convert M2Crypto UTC time string as returned by get_not_before/
285        get_not_after methods into datetime type"""
286       
287        datetimeRE = "([a-zA-Z]{3} {1,2}\d{1,2} \d{2}:\d{2}:\d{2} \d{4}).*"
288        sM2CryptoUTC = None
289       
290        try:
291            # Convert into string
292            sM2CryptoUTC = str(m2CryptoUTC)
293           
294            # Check for expected format - string may have trailing GMT - ignore
295            sTime = re.findall(datetimeRE, sM2CryptoUTC)[0]
296
297            # Convert into a tuple
298            lTime = strptime(sTime, "%b %d %H:%M:%S %Y")[0:6]
299
300            return datetime(lTime[0], lTime[1], lTime[2],
301                            lTime[3], lTime[4], lTime[5])
302                                   
303        except Exception, e:
304            msg = "Error parsing M2Crypto UTC"
305            if sM2CryptoUTC is not None:
306                msg += ": " + sM2CryptoUTC
307               
308            raise X509CertError(msg)
309       
310
311
312#_____________________________________________________________________________
313# Alternative AttCert constructors
314#
315def X509CertRead(filePath):
316    """Create a new X509 certificate read in from a file"""
317
318    x509Cert = X509Cert(filePath=filePath)
319    x509Cert.read()
320   
321    return x509Cert
322
323
324
325
326def X509CertParse(x509CertTxt):
327    """Create a new X509 certificate from string of file content"""
328
329    x509Cert = X509Cert()
330    x509Cert.parse(x509CertTxt)
331   
332    return x509Cert
333
334
335#_____________________________________________________________________________
336class X500DNError(Exception):
337    """Exception handling for NDG X.500 DN class."""
338
339
340#_____________________________________________________________________________
341# For use with parseSeparator method:
342import re
343
344
345class X500DN(dict):
346    "NDG X500 Distinguished name"
347   
348    # Class attribute - look-up mapping short name attributes to their long
349    # name equivalents
350    # * private *
351    __shortNameLUT = {  'commonName':               'CN',
352                        'OrganisationalUnitName':   'OU',
353                        'Organisation':             'O',
354                        'CountryName':              'C',
355                        'EmailAddress':             'EMAILADDRESS',
356                        'localityName':             'L',
357                        'stateOrProvinceName':      'ST',
358                        'streetAddress':            'STREET',
359                        'domainComponent':          'DC',
360                        'userid':                   'UID'}
361
362   
363    def __init__(self,
364                 dn=None,
365                 m2CryptoX509Name=None,
366                 separator=None):
367
368        """Create a new X500 Distinguished Name
369
370        m2CryptoX509Name:   initialise using using an M2Crypto.X509.X509_Name
371        dn:                 initialise using a distinguished name string
372        separator:          separator used to delimit dn fields - usually
373                            '/' or ','.  If dn is input and separator is
374                            omitted the separator character will be
375                            automatically parsed from the dn string.
376                            """
377        # Private key data
378        self.__dat = {  'CN':           '',
379                        'OU':           '',
380                        'O':            '',
381                        'C':            '',
382                        'EMAILADDRESS': '',
383                        'L':            '',
384                        'ST':           '',
385                        'STREET':       '',
386                        'DC':           '',
387                        'UID':          ''}
388
389        dict.__init__(self)
390
391
392        self.__separator = None
393       
394        # Check for separator from input
395        if separator is not None:
396            if not isinstance(separator, basestring):
397                raise X500DNError("dn Separator must be a valid string")
398
399            # Check for single character but allow trailing space chars
400            if len(separator.lstrip()) is not 1:
401                raise X500DNError("dn separator must be a single character")
402
403            self.__separator = separator
404
405           
406        if m2CryptoX509Name is not None:
407       
408            # the argument is an x509 dn in m2crypto format
409            self.__dat['CN'] = m2CryptoX509Name.CN
410
411            # M2Crypto seems to default Email and L variables to None - in
412            # this case avoid making an assignment because it upsets calls to
413            # __cmp__() - None could be compared to '' conceptually the same
414            # but not equal progammatically
415            #
416            # P J Kershaw 13/06/05
417            if m2CryptoX509Name.L is not None:
418                self.__dat['L'] = m2CryptoX509Name.L
419
420            self.__dat['O'] = m2CryptoX509Name.O
421            self.__dat['OU'] = m2CryptoX509Name.OU
422
423            if m2CryptoX509Name.Email is not None:
424                self.__dat['EMAILADDRESS'] = m2CryptoX509Name.Email
425
426        elif dn is not None:
427
428            # Separator can be parsed from the input DN string - only attempt
429            # if no explict separator was input
430            if self.__separator is None:
431                self.__separator = self.parseSeparator(dn)
432               
433            # Split Distinguished name string into constituent fields
434            self.deserialise(dn)
435
436
437    def __repr__(self):
438        """Override default behaviour to return internal dictionary content"""
439        return self.serialise()
440
441
442    def __str__(self):
443        """Behaviour for print and string statements - convert DN into
444        serialised format."""
445        return self.serialise()
446
447       
448    def __eq__(self, x500dn):
449
450        """Return true if the all the fields of the two DNs are equal"""
451       
452        if not isinstance(x500dn, X500DN):
453            return False
454
455        return self.__dat.items() == x500dn.items()
456
457       
458    def __cmp__(self, x500dn):
459
460        """Return true if the all the fields of the two DNs are equal"""
461       
462        if not isinstance(x500dn, X500DN):
463            return False
464
465        return cmp(self.__dat, x500dn.get())
466
467   
468    def __delitem__(self, key):
469
470        """Prevent keys from being deleted."""
471        raise X500DNError('Keys cannot be deleted from the X500DN')
472
473
474    def __getitem__(self, key):
475
476        # Check input key
477        if self.__dat.has_key(key):
478
479            # key recognised
480            return self.__dat[key]
481       
482        elif X500DN.__shortNameLUT.has_key(key):
483
484            # key not recognised - but a long name version of the key may
485            # have been passed
486            shortName = X500DN.__shortNameLUT[key]
487            return self.__dat[shortName]
488
489        else:
490            # key not recognised as a short or long name version
491            raise X500DNError('Key "' + key + '" not recognised for X500DN')
492
493
494    def __setitem__(self, key, item):
495       
496        # Check input key
497        if self.__dat.has_key(key):
498
499            # key recognised
500            self.__dat[key] = item
501           
502        elif X500DN.__shortNameLUT.has_key(key):
503               
504            # key not recognised - but a long name version of the key may
505            # have been passed
506            shortName = X500DN.__shortNameLUT[key]
507            self.__dat[shortName] = item
508           
509        else:
510            # key not recognised as a short or long name version
511            raise X500DNError('Key "' + key + '" not recognised for X500DN')
512
513
514    def clear(self):
515        raise X500DNError("Data cannot be cleared from " + self.__class__.__name__)
516
517   
518    def copy(self):
519
520        import copy
521        return copy.copy(self)
522
523   
524    def keys(self):
525        return self.__dat.keys()
526
527
528    def items(self):
529        return self.__dat.items()
530
531
532    def values(self):
533        return self.__dat.values()
534
535
536    def has_key(self, key):
537        return self.__dat.has_key(key)
538
539    # 'in' operator
540    def __contains__(self, key):
541        return key in self.__tags
542
543
544    def get(self):
545        """Get Distinguished name as a data dictionary."""
546        return self.__dat
547
548   
549    def serialise(self, separator=None):
550
551        """Combine fields in Distinguished Name into a single string."""
552       
553        if separator:
554            if not isinstance(separator, basestring):
555                raise X500DNError("Separator must be a valid string")
556               
557            self.__separator = separator
558           
559        else:
560            # Default to / if no separator is set
561            separator = '/'
562
563
564        # If using '/' then prepend DN with an initial '/' char
565        if separator == '/':
566            sDN = separator
567        else:
568            sDN = ''
569           
570
571        sDN += separator.join(["%s=%s" % field \
572                                for field in self.__dat.items() if field[1]])
573                               
574        return sDN
575
576    serialize = serialise
577   
578    def deserialise(self, dn, separator=None):
579
580        """Break up a DN string into it's constituent fields and use to
581        update the object's dictionary"""
582       
583        if separator:
584            if not isinstance(separator, basestring):
585                raise X500DNError("Separator must be a valid string")
586
587            self.__separator = separator
588
589
590        # If no separator has been set, parse if from the DN string           
591        if self.__separator is None:
592            self.__separator = self.parseSeparator(dn)
593
594        try:
595            dnFields = dn.split(self.__separator)
596            if len(dnFields) < 2:
597                raise X500DNError("Error parsing DN string: \"%s\"" % dn)
598
599           
600            # Split fields into key/value and also filter null fields if
601            # found e.g. a leading '/' in the DN would yield a null field
602            # when split
603            keyVals = [field.split('=') for field in dnFields if field]
604
605            # Reset existing dictionary values
606            self.__dat.fromkeys(self.__dat, '')
607           
608            # Strip leading and trailing space chars and convert into a
609            # dictionary
610            parsedDN = dict([(keyVal[0].strip(), keyVal[1].strip()) \
611                                                      for keyVal in keyVals])
612
613            # Copy matching DN fields
614            for i in parsedDN.items():
615                if not self.__dat.has_key(i[0]):
616                    raise X500DNError(\
617                        "Invalid field \"%s\" in input DN string" % i[0])
618
619                self.__dat[i[0]] = i[1]
620
621               
622        except Exception, excep:
623            raise X500DNError("Error de-serialising DN \"%s\": %s" % \
624                              (dn, str(excep)))
625
626    deserialize = deserialise
627   
628    def parseSeparator(self, dn):
629
630        """Attempt to parse the separator character from a given input
631        DN string.  If not found, return None
632
633        DNs don't use standard separators e.g.
634
635        /C=UK/O=eScience/OU=CLRC/L=DL/CN=AN Other
636        CN=SUM Oneelse,L=Didcot, O=RAL,OU=SSTD
637
638        This function isolates and identifies the character.  - In the above,
639        '/' and ',' respectively"""
640
641
642        # Make a regular expression containing all the possible field
643        # identifiers with equal sign appended and 'or'ed together.  \W should
644        # match the separator which preceeds the field name. \s* allows any
645        # whitespace between field name and field separator to be taken into
646        # account.
647        #
648        # The resulting match should be a list.  The first character in each
649        # element in the list should be the field separator and should be the
650        # same
651        regExpr = '|'.join(['\W\s*'+i+'=' for i in self.__dat.keys()])
652        match = re.findall(regExpr, dn)
653           
654        # In the first example above, the resulting match is:
655        # ['/C=', '/O=', '/OU=', '/L=']
656        # In each element the first character is the separator
657        sepList = [i[0:1] for i in match]
658
659        # All separators should be the same character - return None if they
660        # don't match
661        if not [i for i in sepList if i != sepList[0]]:
662            return sepList[0]
663        else:
664            return None
Note: See TracBrowser for help on using the repository browser.