source: TI12-security/trunk/python/ndg.security.common/ndg/security/common/X509.py @ 4654

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg.security.common/ndg/security/common/X509.py@4654
Revision 4654, 32.5 KB checked in by pjkersha, 11 years ago (diff)

#884: add capability to X509Cert.isValidTime to warn when X.509 certificates are due to expire within a certain time limit (default 30 days). isValidTime is now called from read and parsing routines. warnings.warn and logging.warning are called so logs from security services will display the messages.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1"""X.509 certificate handling class encapsulates M2Crypto.X509
2
3NERC Data Grid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "05/04/05"
7__copyright__ = "(C) 2007 STFC & NERC"
8__license__ = \
9"""This software may be distributed under the terms of the Q Public
10License, version 1.0 or later."""
11__contact__ = "Philip.Kershaw@stfc.ac.uk"
12__revision__ = '$Id$'
13import logging
14log = logging.getLogger(__name__)
15from warnings import warn # warn of impendiong certificate expiry
16
17import types
18import re
19
20# Handle not before and not after strings
21from time import strftime
22from time import strptime
23from datetime import datetime
24
25import M2Crypto
26
27
28class X509CertError(Exception):
29    """Exception handling for NDG X.509 Certificate handling class."""
30
31class X509CertReadError(X509CertError):
32    """Error reading in certificate from file"""
33   
34class X509CertInvalidNotBeforeTime(X509CertError):
35    """Call from X509Cert.isValidTime if certificates not before time is
36    BEFORE the current system time"""
37   
38class X509CertExpired(X509CertError):
39    """Call from X509Cert.isValidTime if certificate has expired"""
40
41   
42class X509Cert(object):
43    "NDG X509 Certificate Handling"
44
45    def __init__(self, filePath=None, m2CryptoX509=None):
46
47        # Set certificate file path
48        if filePath is not None:
49            if not isinstance(filePath, basestring):
50                raise X509CertError(
51                        "Certificate File Path input must be a valid string")
52           
53        self.__filePath = filePath           
54        self.__dn = None
55        self.__dtNotBefore = None
56        self.__dtNotAfter = None
57       
58        if m2CryptoX509:
59            self.__setM2CryptoX509(m2CryptoX509)
60       
61
62    def read(self, filePath=None, **isValidTimeKw):
63        """Read a certificate from PEM encoded file
64       
65        @type filePath: basestring
66        @param filePath: file path of PEM format file to be read
67       
68        @type isValidTimeKw: dict
69        @param isValidTimeKw: keywords to isValidTime() call"""
70       
71        # Check for optional input certificate file path
72        if filePath is not None:
73            if not isinstance(filePath, basestring):
74                raise X509CertError("Certificate File Path input must be a "
75                                    "valid string")
76           
77            self.__filePath = filePath
78       
79        try:
80            self.__m2CryptoX509 = M2Crypto.X509.load_cert(self.__filePath)
81        except Exception, e:
82            raise X509CertReadError("Error loading certificate \"%s\": %s" %
83                                    (self.__filePath, e))
84
85        # Update DN and validity times from M2Crypto X509 object just
86        # created
87        self.__setM2CryptoX509()
88       
89        if 'warningStackLevel'not in isValidTimeKw:
90            isValidTimeKw['warningStackLevel'] = 3
91           
92        self.isValidTime(**isValidTimeKw)
93
94
95    def parse(self, certTxt, **isValidTimeKw):
96        """Read a certificate input as a string
97       
98        @type certTxt: basestring
99        @param certTxt: PEM encoded certificate to parse
100       
101        @type isValidTimeKw: dict
102        @param isValidTimeKw: keywords to isValidTime() call"""
103
104        try:
105            # Create M2Crypto memory buffer and pass to load certificate
106            # method
107            #
108            # Nb. input converted to standard string - buffer method won't
109            # accept unicode type strings
110            certBIO = M2Crypto.BIO.MemoryBuffer(str(certTxt))
111            self.__m2CryptoX509 = M2Crypto.X509.load_cert_bio(certBIO)
112           
113        except Exception, e:
114            raise X509CertError("Error loading certificate: %s" % e)
115
116        # Update DN and validity times from M2Crypto X509 object just
117        # created
118        self.__setM2CryptoX509()
119       
120       
121        if 'warningStackLevel'not in isValidTimeKw:
122            isValidTimeKw['warningStackLevel'] = 3
123                   
124        self.isValidTime(**isValidTimeKw)
125
126     
127    def __setM2CryptoX509(self, m2CryptoX509=None):
128        """Private method allows class members to be updated from the
129        current M2Crypto object.  __m2CryptoX509 must have been set."""
130       
131        if m2CryptoX509 is not None:
132            if not isinstance(m2CryptoX509, M2Crypto.X509.X509):
133                raise TypeError("Incorrect type for input M2Crypto.X509.X509 "
134                                "object")
135                   
136            self.__m2CryptoX509 = m2CryptoX509
137           
138           
139        # Get distinguished name
140        m2CryptoX509Name = self.__m2CryptoX509.get_subject()
141
142        # Instantiate X500 Distinguished name
143        self.__dn = X500DN(m2CryptoX509Name=m2CryptoX509Name)
144
145
146        # Get not before and not after validity times
147        #
148        # Only option for M2Crypto seems to be to return the times as
149        # formatted strings and then parse them in order to create a datetime
150        # type
151       
152        try:
153            m2CryptoNotBefore = self.__m2CryptoX509.get_not_before()
154            self.__dtNotBefore=self.__m2CryptoUTC2datetime(m2CryptoNotBefore)
155                                       
156        except Exception, e:
157            raise X509CertError("Not Before time: %s" % e)
158
159       
160        try:
161            m2CryptoNotAfter = self.__m2CryptoX509.get_not_after()
162            self.__dtNotAfter = self.__m2CryptoUTC2datetime(m2CryptoNotAfter)
163                                   
164        except Exception, e:
165            raise X509CertError("Not After time: %s" % e)
166
167
168    def __getM2CryptoX509(self, m2CryptoX509=None):
169        "Return M2Crypto X.509 cert object"
170        return self.__m2CryptoX509
171   
172   
173    m2CryptoX509 = property(fset=__setM2CryptoX509,
174                            fget=__getM2CryptoX509,
175                            doc="M2Crypto.X509.X509 type")
176
177       
178    def toString(self, **kw):
179        """Return certificate file content as a PEM format
180        string"""
181        return self.asPEM(**kw)
182       
183    def asPEM(self, filePath=None):
184        """Return certificate file content as a PEM format
185        string"""
186       
187        # Check M2Crypto.X509 object has been instantiated - if not call
188        # read method
189        if self.__m2CryptoX509 is None:
190            self.read(filePath)
191           
192        return self.__m2CryptoX509.as_pem()
193
194       
195    def asDER(self):
196        """Return certificate file content in DER format"""
197       
198        # Check M2Crypto.X509 object has been instantiated
199        assert(self.__m2CryptoX509)
200        return self.__m2CryptoX509.as_der()
201
202   
203    # Make some attributes accessible as read-only
204    def __getDN(self):
205        """Get X500 Distinguished Name."""
206        return self.__dn
207
208    dn = property(fget=__getDN, doc="X.509 Distinguished Name")
209
210
211    def __getVersion(self):
212        """Get X.509 Certificate version"""
213        if self.__m2CryptoX509 is None:
214            return None
215       
216        return self.__m2CryptoX509.get_version()
217
218    version = property(fget=__getVersion, doc="X.509 Certificate version")
219       
220       
221    def __getSerialNumber(self):
222        """Get Serial Number"""
223        if self.__m2CryptoX509 is None:
224            return None
225       
226        return self.__m2CryptoX509.get_serial_number()
227   
228    serialNumber = property(fget=__getSerialNumber, 
229                            doc="X.509 Certificate Serial Number")
230       
231
232    def __getNotBefore(self):
233        """Get not before validity time as datetime type"""
234        if self.__m2CryptoX509 is None:
235            return None
236       
237        return self.__dtNotBefore
238
239    notBefore = property(fget=__getNotBefore, 
240                         doc="Not before validity time as datetime type")
241       
242       
243    def __getNotAfter(self):
244        """Get not after validity time as datetime type"""
245        if self.__m2CryptoX509 is None:
246            return None
247       
248        return self.__dtNotAfter
249
250    notAfter = property(fget=__getNotAfter, 
251                         doc="Not after validity time as datetime type")
252       
253       
254    def __getPubKey(self):
255        """Get public key
256       
257        @return: RSA public key for certificate
258        @rtype: M2Crypto.RSA.RSA_pub"""
259        if self.__m2CryptoX509 is None:
260            return None
261       
262        return self.__m2CryptoX509.get_pubkey()
263
264    pubKey = property(fget=__getPubKey,  doc="Public Key")
265       
266       
267    def __getIssuer(self):
268        """Get Certificate issuer"""
269        if self.__m2CryptoX509 is None:
270            return None
271       
272        # Return as X500DN type
273        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_issuer())
274
275    issuer = property(fget=__getIssuer,  doc="Certificate Issuer")
276       
277   
278    def __getSubject(self):
279        """Get Certificate subject"""
280        if self.__m2CryptoX509 is None:
281            return None
282
283        # Return as X500DN type
284        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_subject())
285   
286    subject = property(fget=__getSubject,  doc="Certificate subject")
287
288
289    def isValidTime(self, 
290                    raiseExcep=False, 
291                    expiryWarning=True, 
292                    nDaysBeforeExpiryLimit=30,
293                    warningStackLevel=2):
294        """Check Certificate for expiry
295
296        @type raiseExcep: bool
297        @param raiseExcep: set True to raise an exception if certificate is
298        invalid
299       
300        @type expiryWarning: bool
301        @param expiryWarning: set to True to output a warning message if the
302        certificate is due to expire in less than nDaysBeforeExpiryLimit days.
303        Message is sent using warnings.warn and through logging.warning.  No
304        message is set if the certificate has an otherwise invalid time
305       
306        @type nDaysBeforeExpiryLimit: int
307        @param nDaysBeforeExpiryLimit: used in conjunction with the
308        expiryWarning flag.  Set the number of days in advance of certificate
309        expiry from which to start outputing warnings
310       
311        @type warningStackLevel: int
312        @param warningStackLevel: set where in the stack to flag the warning
313        from.  Level 2 will flag it at the level of the caller of this
314        method.  Level 3 would flag at the level of the caller of the caller
315        and so on.
316       
317        @raise X509CertInvalidNotBeforeTime: current time is before the
318        certificate's notBefore time
319        @raise X509CertExpired: current time is after the certificate's
320        notAfter time"""
321
322        if not isinstance(self.__dtNotBefore, datetime):
323            raise X509CertError("Not Before datetime is not set")
324
325        if not isinstance(self.__dtNotAfter, datetime):
326            raise X509CertError("Not After datetime is not set")
327       
328        dtNow = datetime.utcnow()
329        isValidTime = dtNow > self.__dtNotBefore and dtNow < self.__dtNotAfter
330
331        # Helper string for message output
332        if self.__filePath:
333            fileInfo = ' "%s"' % self.__filePath
334        else:
335            fileInfo = ''
336             
337       
338        # Set a warning message for impending expiry of certificate but only
339        # if the certificate is not any other way invalid - see below
340        if isValidTime and expiryWarning:
341            dtTime2Expiry = self.__dtNotAfter - dtNow
342            if dtTime2Expiry.days < nDaysBeforeExpiryLimit:
343                msg = ('Certificate%s with DN "%s" will expire in %d days on: '
344                       '%s' % (fileInfo, 
345                               self.dn, 
346                               dtTime2Expiry.days, 
347                               self.__dtNotAfter))
348                warn(msg, stacklevel=warningStackLevel)
349                log.warning(msg)
350       
351                     
352        if dtNow < self.__dtNotBefore:
353            msg = ("Current time %s is before the certificate's Not Before "
354                   'Time %s for certificate%s with DN "%s"' % 
355                   (dtNow, self.__dtNotBefore, fileInfo, self.dn))
356            log.error(msg)
357            if raiseExcep:
358                raise X509CertInvalidNotBeforeTime(msg)
359           
360        elif dtNow > self.__dtNotAfter:
361            msg = ('Certificate%s with DN "%s" has expired: the time now is '
362                   '%s and the certificate expiry is %s.' %(fileInfo,
363                                                            self.dn, 
364                                                            dtNow, 
365                                                            self.__dtNotAfter))
366            if raiseExcep:
367                raise X509CertExpired(msg)
368
369        # If exception flag is not set return validity as bool
370        return isValidTime
371
372
373
374
375    def __m2CryptoUTC2datetime(self, m2CryptoUTC):
376        """Convert M2Crypto UTC time string as returned by get_not_before/
377        get_not_after methods into datetime type"""
378       
379        datetimeRE = "([a-zA-Z]{3} {1,2}\d{1,2} \d{2}:\d{2}:\d{2} \d{4}).*"
380        sM2CryptoUTC = None
381       
382        try:
383            # Convert into string
384            sM2CryptoUTC = str(m2CryptoUTC)
385           
386            # Check for expected format - string may have trailing GMT - ignore
387            sTime = re.findall(datetimeRE, sM2CryptoUTC)[0]
388
389            # Convert into a tuple
390            lTime = strptime(sTime, "%b %d %H:%M:%S %Y")[0:6]
391
392            return datetime(lTime[0], lTime[1], lTime[2],
393                            lTime[3], lTime[4], lTime[5])
394                                   
395        except Exception, e:
396            msg = "Error parsing M2Crypto UTC"
397            if sM2CryptoUTC is not None:
398                msg += ": " + sM2CryptoUTC
399               
400            raise X509CertError(msg)
401       
402    def verify(self, pubKey, **kw):
403        """Verify a certificate against the public key of the
404        issuer
405       
406        @param pubKey: public key of cert that issued self
407        @type pubKey: M2Crypto.RSA.RSA_pub
408        @param **kw: keywords to pass to M2Crypto.X509.X509 -
409        'pkey'
410        @type: dict
411        @return: True if verifies OK, False otherwise
412        @rtype: bool
413        """
414        return bool(self.__m2CryptoX509.verify(pubKey, **kw))
415
416    @classmethod
417    def Read(cls, filePath, **isValidTimeKw):
418        """Create a new X509 certificate read in from a file"""
419   
420        x509Cert = cls(filePath=filePath)
421       
422        if 'warningStackLevel' not in isValidTimeKw:
423            isValidTimeKw['warningStackLevel'] = 4
424           
425        x509Cert.read(**isValidTimeKw)
426       
427        return x509Cert
428   
429    @classmethod
430    def Parse(cls, x509CertTxt, **isValidTimeKw):
431        """Create a new X509 certificate from string of file content"""
432   
433        x509Cert = cls()
434       
435        if 'warningStackLevel' not in isValidTimeKw:
436            isValidTimeKw['warningStackLevel'] = 4
437           
438        x509Cert.parse(x509CertTxt, **isValidTimeKw)
439       
440        return x509Cert
441       
442# Alternative AttCert constructors
443def X509CertRead(filePath, **isValidTimeKw):
444    """Create a new X509 certificate read in from a file"""
445
446    x509Cert = X509Cert(filePath=filePath)
447   
448    if 'warningStackLevel' not in isValidTimeKw:
449        isValidTimeKw['warningStackLevel'] = 4
450       
451    x509Cert.read(**isValidTimeKw)
452   
453    return x509Cert
454
455def X509CertParse(x509CertTxt, **isValidTimeKw):
456    """Create a new X509 certificate from string of file content"""
457
458    x509Cert = X509Cert()
459    if 'warningStackLevel' not in isValidTimeKw:
460        isValidTimeKw['warningStackLevel'] = 4
461           
462    x509Cert.parse(x509CertTxt, **isValidTimeKw)
463   
464    return x509Cert
465
466
467class X509StackError(X509CertError):
468    """Error from X509Stack type"""
469
470class X509StackEmptyError(X509CertError):
471    """Expecting non-zero length X509Stack"""
472
473class X509CertIssuerNotFound(X509CertError):
474    """Raise from verifyCertChain if no certificate can be found to verify the
475    input"""
476
477class SelfSignedCert(X509CertError):
478    """Raise from verifyCertChain if cert. is self-signed and
479    rejectSelfSignedCert=True"""
480
481class X509CertInvalidSignature(X509CertError):
482    """X.509 Certificate has an invalid signature"""
483       
484class X509Stack(object):
485    """Wrapper for M2Crypto X509_Stack"""
486   
487    def __init__(self, m2X509Stack=None):
488        """Initialise from an M2Crypto stack object
489       
490        @param m2X509Stack: M2Crypto X.509 stack object
491        @type m2X509Stack: M2Crypto.X509.X509_Stack"""
492       
493        self.__m2X509Stack = m2X509Stack or M2Crypto.X509.X509_Stack()
494       
495    def __len__(self):
496        """@return: length of stack
497        @rtype: int"""
498        return self.__m2X509Stack.__len__()
499
500    def __getitem__(self, idx):
501        """Index stack as an array
502        @param idx: stack index
503        @type idx: int
504        @return: X.509 cert object
505        @rtype: ndg.security.common.X509.X509Cert"""
506       
507        return X509Cert(m2CryptoX509=self.__m2X509Stack.__getitem__(idx))
508   
509    def __iter__(self):
510        """@return: stack iterator
511        @rtype: listiterator"""
512        return iter([X509Cert(m2CryptoX509=i) for i in self.__m2X509Stack])
513
514    def push(self, x509Cert):
515        """Push an X509 certificate onto the stack.
516       
517        @param x509Cert: X509 object.
518        @type x509Cert: M2Crypto.X509.X509,
519        ndg.security.common.X509.X509Cert or basestring
520        @return: The number of X509 objects currently on the stack.
521        @rtype: int"""
522        if isinstance(x509Cert, M2Crypto.X509.X509):
523            return self.__m2X509Stack.push(x509Cert)
524       
525        elif isinstance(x509Cert, X509Cert):
526            return self.__m2X509Stack.push(x509Cert.m2CryptoX509)
527       
528        elif isinstance(x509Cert, basestring):
529            return self.__m2X509Stack.push(\
530                                       X509CertParse(x509Cert).m2CryptoX509)           
531        else:
532            raise X509StackError("Expecting M2Crypto.X509.X509, ndg.security."
533                                 "common.X509.X509Cert or string type")
534               
535    def pop(self):
536        """Pop a certificate from the stack.
537       
538        @return: X509 object that was popped, or None if there is nothing
539        to pop.
540        @rtype: ndg.security.common.X509.X509Cert
541        """
542        return X509Cert(m2CryptoX509=self.__m2X509Stack.pop())
543
544
545    def asDER(self):
546        """Return the stack as a DER encoded string
547        @return: DER string
548        @rtype: string"""
549        return self.__m2X509Stack.as_der()
550
551
552    def verifyCertChain(self, 
553                        x509Cert2Verify=None, 
554                        caX509Stack=[],
555                        rejectSelfSignedCert=True):
556        """Treat stack as a list of certificates in a chain of
557        trust.  Validate the signatures through to a single root issuer. 
558
559        @param x509Cert2Verify: X.509 certificate to be verified default is
560        last in the stack
561        @type x509Cert2Verify: X509Cert
562       
563        @param caX509Stack: X.509 stack containing CA certificates that are
564        trusted.
565        @type caX509Stack: X509Stack
566       
567        @param rejectSelfSignedCert: Set to True (default) to raise an
568        SelfSignedCert exception if a certificate in self's stack is
569        self-signed. 
570        @type rejectSelfSignedCert: bool"""
571       
572        n2Validate = len(self)
573        if x509Cert2Verify:
574            # One more to validate in addition to stack content
575            n2Validate += 1
576        else:
577            # Validate starting from last on stack - but check first that it's
578            # populated
579            if n2Validate == 0:
580                raise X509StackEmptyError("Empty stack and no x509Cert2Verify "
581                                          "set: no cert.s to verify")
582
583            x509Cert2Verify = self[-1]
584             
585               
586        # Exit loop if all certs have been validated or if find a self
587        # signed cert.
588        nValidated = 0
589        issuerX509Cert = None
590        while nValidated < n2Validate:               
591            issuerX509Cert = None
592            issuerDN = x509Cert2Verify.issuer
593           
594            # Search for issuing certificate in stack
595            for x509Cert in self:
596                if x509Cert.dn == issuerDN:
597                    # Match found - the cert.'s issuer has been found in the
598                    # stack
599                    issuerX509Cert = x509Cert
600                    break
601                   
602            if issuerX509Cert:
603                # An issuing cert. has been found - use it to check the
604                # signature of the cert. to be verified
605                if not x509Cert2Verify.verify(issuerX509Cert.pubKey):
606                    X509CertInvalidSignature('Signature is invalid for cert. '
607                                             '"%s"' % x509Cert2Verify.dn)
608               
609                # In the next iteration the issuer cert. will be checked:
610                # 1) search for a cert. in the stack that issued it
611                # 2) If found use the issuing cert. to verify
612                x509Cert2Verify = issuerX509Cert
613                nValidated += 1
614            else:
615                # All certs in the stack have been searched
616                break
617
618
619        if issuerX509Cert:           
620            # Check for self-signed certificate
621            if nValidated == 1 and rejectSelfSignedCert and \
622               issuerX509Cert.dn == issuerX509Cert.issuer:
623
624                # If only one iteration occured then it must be a self
625                # signed certificate
626                raise SelfSignedCert("Certificate is self signed: [DN=%s]" %
627                                     issuerX509Cert.dn)
628           
629            if not caX509Stack:
630                caX509Stack = [issuerX509Cert]
631                         
632        elif not caX509Stack:
633            raise X509CertIssuerNotFound('No issuer cert. found for cert. '
634                                         '"%s"' % x509Cert2Verify.dn)
635           
636        for caCert in caX509Stack:
637            issuerDN = x509Cert2Verify.issuer
638            if caCert.dn == issuerDN:
639                issuerX509Cert = caCert
640                break
641       
642        if issuerX509Cert:   
643            if not x509Cert2Verify.verify(issuerX509Cert.pubKey):
644                X509CertInvalidSignature('Signature is invalid for cert. "%s"'%
645                                         x509Cert2Verify.dn)
646           
647            # Chain is validated through to CA cert
648            return
649        else:
650            raise X509CertIssuerNotFound('No issuer cert. found for '
651                                         'certificate "%s"'%x509Cert2Verify.dn)
652       
653        # If this point is reached then an issuing cert is missing from the
654        # chain       
655        raise X509CertIssuerNotFound('Can\'t find issuer cert "%s" for '
656                                     'certificate "%s"' %
657                                     (x509Cert2Verify.issuer, 
658                                      x509Cert2Verify.dn))
659
660
661def X509StackParseFromDER(derString):
662    """Make a new stack from a DER string
663   
664    @param derString: DER formatted X.509 stack data
665    @type derString: string
666    @return: new stack object
667    @rtype: X509Stack""" 
668    return X509Stack(m2X509Stack=M2Crypto.X509.new_stack_from_der(derString))
669
670
671class X500DNError(Exception):
672    """Exception handling for NDG X.500 DN class."""
673
674
675# For use with parseSeparator method:
676import re
677
678
679class X500DN(dict):
680    "NDG X500 Distinguished name"
681   
682    # Class attribute - look-up mapping short name attributes to their long
683    # name equivalents
684    # * private *
685    __shortNameLUT = {
686        'commonName':               'CN',
687        'organisationalUnitName':   'OU',
688        'organisation':             'O',
689        'countryName':              'C',
690        'emailAddress':             'EMAILADDRESS',
691        'localityName':             'L',
692        'stateOrProvinceName':      'ST',
693        'streetAddress':            'STREET',
694        'domainComponent':              'DC',
695        'userid':                       'UID'
696    }
697
698   
699    def __init__(self, dn=None, m2CryptoX509Name=None, separator=None):
700
701        """Create a new X500 Distinguished Name
702
703        @type m2CryptoX509Name: M2Crypto.X509.X509_Name
704        @param m2CryptoX509Name:   initialise using using an
705        M2Crypto.X509.X509_Name
706        @type dn: basestring
707        @param dn: initialise using a distinguished name string
708        @type separator: basestring
709        @param: separator: separator used to delimit dn fields - usually '/'
710        or ','.  If dn is input and separator is omitted the separator
711        character will be automatically parsed from the dn string.
712        """
713       
714        # Private key data
715        self.__dat = {}.fromkeys(X500DN.__shortNameLUT.values(), '')
716   
717        dict.__init__(self)
718   
719        self.__separator = None
720       
721        # Check for separator from input
722        if separator is not None:
723            if not isinstance(separator, basestring):
724                raise X500DNError("dn Separator must be a valid string")
725
726            # Check for single character but allow trailing space chars
727            if len(separator.lstrip()) is not 1:
728                raise X500DNError("dn separator must be a single character")
729
730            self.__separator = separator
731           
732        if m2CryptoX509Name is not None:
733            # the argument is an x509 dn in m2crypto format
734            self.deserialise(str(m2CryptoX509Name))
735           
736        elif dn is not None:
737            # Separator can be parsed from the input DN string - only attempt
738            # if no explict separator was input
739            if self.__separator is None:
740                self.__separator = self.parseSeparator(dn)
741               
742            # Split Distinguished name string into constituent fields
743            self.deserialise(dn)
744
745    def __repr__(self):
746        """Override default behaviour to return internal dictionary content"""
747        return self.serialise()
748
749    def __str__(self):
750        """Behaviour for print and string statements - convert DN into
751        serialised format."""
752        return self.serialise()
753       
754    def __eq__(self, x500dn):
755        """Return true if the all the fields of the two DNs are equal"""
756       
757        if not isinstance(x500dn, X500DN):
758            return False
759
760        return self.__dat.items() == x500dn.items()
761   
762    def __ne__(self, x500dn):
763        """Return true if the all the fields of the two DNs are equal"""
764       
765        if not isinstance(x500dn, X500DN):
766            return False
767
768        return self.__dat.items() != x500dn.items()
769 
770    def __delitem__(self, key):
771        """Prevent keys from being deleted."""
772        raise X500DNError('Keys cannot be deleted from the X500DN')
773
774    def __getitem__(self, key):
775
776        # Check input key
777        if self.__dat.has_key(key):
778
779            # key recognised
780            return self.__dat[key]
781       
782        elif X500DN.__shortNameLUT.has_key(key):
783
784            # key not recognised - but a long name version of the key may
785            # have been passed
786            shortName = X500DN.__shortNameLUT[key]
787            return self.__dat[shortName]
788
789        else:
790            # key not recognised as a short or long name version
791            raise KeyError('Key "' + key + '" not recognised for X500DN')
792
793    def __setitem__(self, key, item):
794       
795        # Check input key
796        if self.__dat.has_key(key):
797
798            # key recognised
799            self.__dat[key] = item
800           
801        elif X500DN.__shortNameLUT.has_key(key):
802               
803            # key not recognised - but a long name version of the key may
804            # have been passed
805            shortName = X500DN.__shortNameLUT[key]
806            self.__dat[shortName] = item
807           
808        else:
809            # key not recognised as a short or long name version
810            raise KeyError('Key "' + key + '" not recognised for X500DN')
811
812    def clear(self):
813        raise X500DNError("Data cannot be cleared from X500DN")
814
815    def copy(self):
816
817        import copy
818        return copy.copy(self)
819
820    def keys(self):
821        return self.__dat.keys()
822
823    def items(self):
824        return self.__dat.items()
825
826    def values(self):
827        return self.__dat.values()
828
829    def has_key(self, key):
830        return self.__dat.has_key(key)
831
832    # 'in' operator
833    def __contains__(self, key):
834        return key in self.__tags
835
836    def get(self, *arg):
837        return self.__dat.get(*arg)
838 
839    def serialise(self, separator=None):
840        """Combine fields in Distinguished Name into a single string."""
841       
842        if separator:
843            if not isinstance(separator, basestring):
844                raise X500DNError("Separator must be a valid string")
845               
846            self.__separator = separator
847           
848        else:
849            # Default to / if no separator is set
850            separator = '/'
851
852
853        # If using '/' then prepend DN with an initial '/' char
854        if separator == '/':
855            sDN = separator
856        else:
857            sDN = ''
858     
859        dnList = []
860        for (key, val) in self.__dat.items():
861            if val:
862                if isinstance(val, tuple):
863                    dnList += [separator.join(["%s=%s" % (key, valSub) \
864                                               for valSub in val])]
865                else:
866                    dnList += ["%s=%s" % (key, val)]
867               
868        sDN += separator.join(dnList)
869                               
870        return sDN
871
872    serialize = serialise
873   
874    def deserialise(self, dn, separator=None):
875        """Break up a DN string into it's constituent fields and use to
876        update the object's dictionary"""
877       
878        if separator:
879            if not isinstance(separator, basestring):
880                raise X500DNError("Separator must be a valid string")
881
882            self.__separator = separator
883
884
885        # If no separator has been set, parse if from the DN string           
886        if self.__separator is None:
887            self.__separator = self.parseSeparator(dn)
888
889        try:
890            dnFields = dn.split(self.__separator)
891            if len(dnFields) < 2:
892                raise X500DNError("Error parsing DN string: \"%s\"" % dn)
893
894           
895            # Split fields into key/value and also filter null fields if
896            # found e.g. a leading '/' in the DN would yield a null field
897            # when split
898           
899            items = [field.split('=') for field in dnFields if field]
900
901            # Reset existing dictionary values
902            self.__dat.fromkeys(self.__dat, '')
903           
904            # Strip leading and trailing space chars and convert into a
905            # dictionary
906            parsedDN = {}
907            for (key, val) in items:
908                key = key.strip()
909                if key in parsedDN:
910                    if isinstance(parsedDN[key], tuple):
911                        parsedDN[key] = tuple(list(parsedDN[key]) + [val])                   
912                    else:
913                        parsedDN[key] = (parsedDN[key], val)
914                else:
915                    parsedDN[key] = val
916               
917            # Copy matching DN fields
918            for key, val in parsedDN.items():
919                if key not in self.__dat and key not in self.__shortNameLUT:
920                    raise X500DNError('Invalid field "%s" in input DN string' %
921                                      key)
922
923                self.__dat[key] = val
924
925               
926        except Exception, excep:
927            raise X500DNError("Error de-serialising DN \"%s\": %s" % \
928                              (dn, str(excep)))
929
930    deserialize = deserialise
931   
932    def parseSeparator(self, dn):
933        """Attempt to parse the separator character from a given input
934        DN string.  If not found, return None
935
936        DNs don't use standard separators e.g.
937
938        /C=UK/O=eScience/OU=CLRC/L=DL/CN=AN Other
939        CN=SUM Oneelse,L=Didcot, O=RAL,OU=SSTD
940
941        This function isolates and identifies the character.  - In the above,
942        '/' and ',' respectively"""
943
944
945        # Make a regular expression containing all the possible field
946        # identifiers with equal sign appended and 'or'ed together.  \W should
947        # match the separator which preceeds the field name. \s* allows any
948        # whitespace between field name and field separator to be taken into
949        # account.
950        #
951        # The resulting match should be a list.  The first character in each
952        # element in the list should be the field separator and should be the
953        # same
954        regExpr = '|'.join(['\W\s*'+i+'=' for i in self.__dat.keys()])
955        match = re.findall(regExpr, dn)
956           
957        # In the first example above, the resulting match is:
958        # ['/C=', '/O=', '/OU=', '/L=']
959        # In each element the first character is the separator
960        sepList = [i[0:1] for i in match]
961
962        # All separators should be the same character - return None if they
963        # don't match
964        if not [i for i in sepList if i != sepList[0]]:
965            return sepList[0]
966        else:
967            return None
Note: See TracBrowser for help on using the repository browser.