source: TI12-security/trunk/python/ndg_security_common/ndg/security/common/X509.py @ 5924

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI12-security/trunk/python/ndg_security_common/ndg/security/common/X509.py@5924
Revision 5924, 33.7 KB checked in by pjkersha, 10 years ago (diff)

Initial unit tests for MyProxy? callout app

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1"""X.509 certificate handling class encapsulates M2Crypto.X509
2
3NERC Data Grid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "05/04/05"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - See LICENSE file in the top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11import logging
12log = logging.getLogger(__name__)
13from warnings import warn # warn of impending certificate expiry
14
15import types
16import re
17
18# Handle not before and not after strings
19from time import strftime
20from time import strptime
21from datetime import datetime
22
23import M2Crypto
24
25
26class X509CertError(Exception):
27    """Exception handling for NDG X.509 Certificate handling class."""
28
29class X509CertReadError(X509CertError):
30    """Error reading in certificate from file"""
31
32class X509CertParseError(X509CertError):
33    """Error parsing a certificate"""
34 
35class X509CertInvalidNotBeforeTime(X509CertError):
36    """Call from X509Cert.isValidTime if certificates not before time is
37    BEFORE the current system time"""
38   
39class X509CertExpired(X509CertError):
40    """Call from X509Cert.isValidTime if certificate has expired"""
41
42   
43class X509Cert(object):
44    "NDG X509 Certificate Handling"
45
46    formatPEM = M2Crypto.X509.FORMAT_PEM
47    formatDER = M2Crypto.X509.FORMAT_DER
48   
49    def __init__(self, filePath=None, m2CryptoX509=None):
50
51        # Set certificate file path
52        if filePath is not None:
53            if not isinstance(filePath, basestring):
54                raise X509CertError("Certificate File Path input must be a "
55                                    "valid string")
56           
57        self.__filePath = filePath           
58        self.__dn = None
59        self.__dtNotBefore = None
60        self.__dtNotAfter = None
61       
62        if m2CryptoX509:
63            self.__setM2CryptoX509(m2CryptoX509)
64        else:
65            self.__m2CryptoX509 = None
66
67    def read(self, 
68             filePath=None, 
69             format=None, 
70             warningStackLevel=3,
71             **isValidTimeKw):
72        """Read a certificate from PEM encoded DER format file
73       
74        @type filePath: basestring
75        @param filePath: file path of PEM format file to be read
76       
77        @type format: int
78        @param format: format of input file - PEM is the default.  Set to
79        X509Cert.formatDER for DER format
80       
81        @type isValidTimeKw: dict
82        @param isValidTimeKw: keywords to isValidTime() call"""
83
84        if format is None:
85            format = X509Cert.formatPEM
86       
87        # Check for optional input certificate file path
88        if filePath is not None:
89            if not isinstance(filePath, basestring):
90                raise X509CertError("Certificate File Path input must be a "
91                                    "valid string")
92           
93            self.__filePath = filePath
94       
95        try:
96            self.__m2CryptoX509 = M2Crypto.X509.load_cert(self.__filePath,
97                                                          format=format)
98        except Exception, e:
99            raise X509CertReadError("Error loading certificate \"%s\": %s" %
100                                    (self.__filePath, e))
101
102        # Update DN and validity times from M2Crypto X509 object just
103        # created
104        self.__setM2CryptoX509()
105       
106        self.isValidTime(warningStackLevel=warningStackLevel, **isValidTimeKw)
107
108
109    def parse(self, 
110              certTxt, 
111              format=None, 
112              warningStackLevel=3,
113              **isValidTimeKw):
114        """Read a certificate input as a string
115       
116        @type certTxt: basestring
117        @param certTxt: PEM encoded certificate to parse
118       
119        @type format: int
120        @param format: format of input file - PEM is the default.  Set to
121        X509Cert.formatDER for DER format
122       
123        @type isValidTimeKw: dict
124        @param isValidTimeKw: keywords to isValidTime() call"""
125
126        if format is None:
127            format = X509Cert.formatPEM
128           
129        try:
130            # Create M2Crypto memory buffer and pass to load certificate
131            # method
132            #
133            # Nb. input converted to standard string - buffer method won't
134            # accept unicode type strings
135#            certBIO = M2Crypto.BIO.MemoryBuffer(str(certTxt))
136#            self.__m2CryptoX509 = M2Crypto.X509.load_cert_bio(certBIO)
137            self.__m2CryptoX509 = M2Crypto.X509.load_cert_string(str(certTxt),
138                                                                 format=format)
139        except Exception, e:
140            raise X509CertParseError("Error loading certificate: %s" % e)
141
142        # Update DN and validity times from M2Crypto X509 object just
143        # created
144        self.__setM2CryptoX509()
145       
146        self.isValidTime(warningStackLevel=warningStackLevel, **isValidTimeKw)
147
148     
149    def __setM2CryptoX509(self, m2CryptoX509=None):
150        """Private method allows class members to be updated from the
151        current M2Crypto object.  __m2CryptoX509 must have been set."""
152       
153        if m2CryptoX509 is not None:
154            if not isinstance(m2CryptoX509, M2Crypto.X509.X509):
155                raise TypeError("Incorrect type for input M2Crypto.X509.X509 "
156                                "object")
157                   
158            self.__m2CryptoX509 = m2CryptoX509
159           
160           
161        # Get distinguished name
162        m2CryptoX509Name = self.__m2CryptoX509.get_subject()
163
164        # Instantiate X500 Distinguished name
165        self.__dn = X500DN(m2CryptoX509Name=m2CryptoX509Name)
166
167
168        # Get not before and not after validity times
169        #
170        # Only option for M2Crypto seems to be to return the times as
171        # formatted strings and then parse them in order to create a datetime
172        # type
173       
174        try:
175            m2CryptoNotBefore = self.__m2CryptoX509.get_not_before()
176            self.__dtNotBefore=self.__m2CryptoUTC2datetime(m2CryptoNotBefore)
177                                       
178        except Exception, e:
179            raise X509CertError("Not Before time: %s" % e)
180
181       
182        try:
183            m2CryptoNotAfter = self.__m2CryptoX509.get_not_after()
184            self.__dtNotAfter = self.__m2CryptoUTC2datetime(m2CryptoNotAfter)
185                                   
186        except Exception, e:
187            raise X509CertError("Not After time: %s" % e)
188
189
190    def __getM2CryptoX509(self, m2CryptoX509=None):
191        "Return M2Crypto X.509 cert object"
192        return self.__m2CryptoX509
193   
194   
195    m2CryptoX509 = property(fset=__setM2CryptoX509,
196                            fget=__getM2CryptoX509,
197                            doc="M2Crypto.X509.X509 type")
198
199       
200    def toString(self, **kw):
201        """Return certificate file content as a PEM format
202        string"""
203        return self.asPEM(**kw)
204       
205    def asPEM(self, filePath=None):
206        """Return certificate file content as a PEM format
207        string"""
208       
209        # Check M2Crypto.X509 object has been instantiated - if not call
210        # read method
211        if self.__m2CryptoX509 is None:
212            self.read(filePath)
213           
214        return self.__m2CryptoX509.as_pem()
215
216       
217    def asDER(self):
218        """Return certificate file content in DER format"""
219       
220        # Check M2Crypto.X509 object has been instantiated
221        assert(self.__m2CryptoX509)
222        return self.__m2CryptoX509.as_der()
223
224   
225    # Make some attributes accessible as read-only
226    def __getDN(self):
227        """Get X500 Distinguished Name."""
228        return self.__dn
229
230    dn = property(fget=__getDN, doc="X.509 Distinguished Name")
231
232
233    def __getVersion(self):
234        """Get X.509 Certificate version"""
235        if self.__m2CryptoX509 is None:
236            return None
237       
238        return self.__m2CryptoX509.get_version()
239
240    version = property(fget=__getVersion, doc="X.509 Certificate version")
241       
242       
243    def __getSerialNumber(self):
244        """Get Serial Number"""
245        if self.__m2CryptoX509 is None:
246            return None
247       
248        return self.__m2CryptoX509.get_serial_number()
249   
250    serialNumber = property(fget=__getSerialNumber, 
251                            doc="X.509 Certificate Serial Number")
252       
253
254    def __getNotBefore(self):
255        """Get not before validity time as datetime type"""
256        if self.__m2CryptoX509 is None:
257            return None
258       
259        return self.__dtNotBefore
260
261    notBefore = property(fget=__getNotBefore, 
262                         doc="Not before validity time as datetime type")
263       
264       
265    def __getNotAfter(self):
266        """Get not after validity time as datetime type"""
267        if self.__m2CryptoX509 is None:
268            return None
269       
270        return self.__dtNotAfter
271
272    notAfter = property(fget=__getNotAfter, 
273                         doc="Not after validity time as datetime type")
274       
275       
276    def __getPubKey(self):
277        """Get public key
278       
279        @return: RSA public key for certificate
280        @rtype: M2Crypto.RSA.RSA_pub"""
281        if self.__m2CryptoX509 is None:
282            return None
283       
284        return self.__m2CryptoX509.get_pubkey()
285
286    pubKey = property(fget=__getPubKey,  doc="Public Key")
287       
288       
289    def __getIssuer(self):
290        """Get Certificate issuer"""
291        if self.__m2CryptoX509 is None:
292            return None
293       
294        # Return as X500DN type
295        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_issuer())
296
297    issuer = property(fget=__getIssuer,  doc="Certificate Issuer")
298       
299   
300    def __getSubject(self):
301        """Get Certificate subject"""
302        if self.__m2CryptoX509 is None:
303            return None
304
305        # Return as X500DN type
306        return X500DN(m2CryptoX509Name=self.__m2CryptoX509.get_subject())
307   
308    subject = property(fget=__getSubject,  doc="Certificate subject")
309
310
311    def isValidTime(self, 
312                    raiseExcep=False, 
313                    expiryWarning=True, 
314                    nDaysBeforeExpiryLimit=30,
315                    warningStackLevel=2):
316        """Check Certificate for expiry
317
318        @type raiseExcep: bool
319        @param raiseExcep: set True to raise an exception if certificate is
320        invalid
321       
322        @type expiryWarning: bool
323        @param expiryWarning: set to True to output a warning message if the
324        certificate is due to expire in less than nDaysBeforeExpiryLimit days.
325        Message is sent using warnings.warn and through logging.warning.  No
326        message is set if the certificate has an otherwise invalid time
327       
328        @type nDaysBeforeExpiryLimit: int
329        @param nDaysBeforeExpiryLimit: used in conjunction with the
330        expiryWarning flag.  Set the number of days in advance of certificate
331        expiry from which to start outputing warnings
332       
333        @type warningStackLevel: int
334        @param warningStackLevel: set where in the stack to flag the warning
335        from.  Level 2 will flag it at the level of the caller of this
336        method.  Level 3 would flag at the level of the caller of the caller
337        and so on.
338       
339        @raise X509CertInvalidNotBeforeTime: current time is before the
340        certificate's notBefore time
341        @raise X509CertExpired: current time is after the certificate's
342        notAfter time"""
343
344        if not isinstance(self.__dtNotBefore, datetime):
345            raise X509CertError("Not Before datetime is not set")
346
347        if not isinstance(self.__dtNotAfter, datetime):
348            raise X509CertError("Not After datetime is not set")
349       
350        dtNow = datetime.utcnow()
351        isValidTime = dtNow > self.__dtNotBefore and dtNow < self.__dtNotAfter
352
353        # Helper string for message output
354        if self.__filePath:
355            fileInfo = ' "%s"' % self.__filePath
356        else:
357            fileInfo = ''
358             
359       
360        # Set a warning message for impending expiry of certificate but only
361        # if the certificate is not any other way invalid - see below
362        if isValidTime and expiryWarning:
363            dtTime2Expiry = self.__dtNotAfter - dtNow
364            if dtTime2Expiry.days < nDaysBeforeExpiryLimit:
365                msg = ('Certificate%s with DN "%s" will expire in %d days on: '
366                       '%s' % (fileInfo, 
367                               self.dn, 
368                               dtTime2Expiry.days, 
369                               self.__dtNotAfter))
370                warn(msg, stacklevel=warningStackLevel)
371                log.warning(msg)
372       
373                     
374        if dtNow < self.__dtNotBefore:
375            msg = ("Current time %s is before the certificate's Not Before "
376                   'Time %s for certificate%s with DN "%s"' % 
377                   (dtNow, self.__dtNotBefore, fileInfo, self.dn))
378            log.error(msg)
379            if raiseExcep:
380                raise X509CertInvalidNotBeforeTime(msg)
381           
382        elif dtNow > self.__dtNotAfter:
383            msg = ('Certificate%s with DN "%s" has expired: the time now is '
384                   '%s and the certificate expiry is %s.' %(fileInfo,
385                                                            self.dn, 
386                                                            dtNow, 
387                                                            self.__dtNotAfter))
388            if raiseExcep:
389                raise X509CertExpired(msg)
390
391        # If exception flag is not set return validity as bool
392        return isValidTime
393
394
395
396
397    def __m2CryptoUTC2datetime(self, m2CryptoUTC):
398        """Convert M2Crypto UTC time string as returned by get_not_before/
399        get_not_after methods into datetime type"""
400       
401        datetimeRE = "([a-zA-Z]{3} {1,2}\d{1,2} \d{2}:\d{2}:\d{2} \d{4}).*"
402        sM2CryptoUTC = None
403       
404        try:
405            # Convert into string
406            sM2CryptoUTC = str(m2CryptoUTC)
407           
408            # Check for expected format - string may have trailing GMT - ignore
409            sTime = re.findall(datetimeRE, sM2CryptoUTC)[0]
410
411            # Convert into a tuple
412            lTime = strptime(sTime, "%b %d %H:%M:%S %Y")[0:6]
413
414            return datetime(lTime[0], lTime[1], lTime[2],
415                            lTime[3], lTime[4], lTime[5])
416                                   
417        except Exception, e:
418            msg = "Error parsing M2Crypto UTC"
419            if sM2CryptoUTC is not None:
420                msg += ": " + sM2CryptoUTC
421               
422            raise X509CertError(msg)
423       
424    def verify(self, pubKey, **kw):
425        """Verify a certificate against the public key of the
426        issuer
427       
428        @param pubKey: public key of cert that issued self
429        @type pubKey: M2Crypto.RSA.RSA_pub
430        @param **kw: keywords to pass to M2Crypto.X509.X509 -
431        'pkey'
432        @type: dict
433        @return: True if verifies OK, False otherwise
434        @rtype: bool
435        """
436        return bool(self.__m2CryptoX509.verify(pubKey, **kw))
437
438    @classmethod
439    def Read(cls, filePath, warningStackLevel=4, **isValidTimeKw):
440        """Create a new X509 certificate read in from a file"""
441   
442        x509Cert = cls(filePath=filePath)
443       
444        x509Cert.read(warningStackLevel=warningStackLevel, **isValidTimeKw)
445       
446        return x509Cert
447   
448    @classmethod
449    def Parse(cls, x509CertTxt, warningStackLevel=4, **isValidTimeKw):
450        """Create a new X509 certificate from string of file content"""
451   
452        x509Cert = cls()
453       
454        x509Cert.parse(x509CertTxt, 
455                       warningStackLevel=warningStackLevel,
456                       **isValidTimeKw)
457       
458        return x509Cert
459       
460# Alternative AttCert constructors
461def X509CertRead(filePath, warningStackLevel=4, **isValidTimeKw):
462    """Create a new X509 certificate read in from a file"""
463
464    x509Cert = X509Cert(filePath=filePath)   
465    x509Cert.read(warningStackLevel=warningStackLevel, **isValidTimeKw)
466   
467    return x509Cert
468
469def X509CertParse(x509CertTxt, warningStackLevel=4, **isValidTimeKw):
470    """Create a new X509 certificate from string of file content"""
471
472    x509Cert = X509Cert()
473    x509Cert.parse(x509CertTxt, 
474                   warningStackLevel=warningStackLevel, 
475                   **isValidTimeKw)
476   
477    return x509Cert
478
479
480class X509StackError(X509CertError):
481    """Error from X509Stack type"""
482
483class X509StackEmptyError(X509CertError):
484    """Expecting non-zero length X509Stack"""
485
486class X509CertIssuerNotFound(X509CertError):
487    """Raise from verifyCertChain if no certificate can be found to verify the
488    input"""
489
490class SelfSignedCert(X509CertError):
491    """Raise from verifyCertChain if cert. is self-signed and
492    rejectSelfSignedCert=True"""
493
494class X509CertInvalidSignature(X509CertError):
495    """X.509 Certificate has an invalid signature"""
496       
497class X509Stack(object):
498    """Wrapper for M2Crypto X509_Stack"""
499   
500    def __init__(self, m2X509Stack=None):
501        """Initialise from an M2Crypto stack object
502       
503        @param m2X509Stack: M2Crypto X.509 stack object
504        @type m2X509Stack: M2Crypto.X509.X509_Stack"""
505       
506        self.__m2X509Stack = m2X509Stack or M2Crypto.X509.X509_Stack()
507       
508    def __len__(self):
509        """@return: length of stack
510        @rtype: int"""
511        return self.__m2X509Stack.__len__()
512
513    def __getitem__(self, idx):
514        """Index stack as an array
515        @param idx: stack index
516        @type idx: int
517        @return: X.509 cert object
518        @rtype: ndg.security.common.X509.X509Cert"""
519       
520        return X509Cert(m2CryptoX509=self.__m2X509Stack.__getitem__(idx))
521   
522    def __iter__(self):
523        """@return: stack iterator
524        @rtype: listiterator"""
525        return iter([X509Cert(m2CryptoX509=i) for i in self.__m2X509Stack])
526
527    def push(self, x509Cert):
528        """Push an X509 certificate onto the stack.
529       
530        @param x509Cert: X509 object.
531        @type x509Cert: M2Crypto.X509.X509,
532        ndg.security.common.X509.X509Cert or basestring
533        @return: The number of X509 objects currently on the stack.
534        @rtype: int"""
535        if isinstance(x509Cert, M2Crypto.X509.X509):
536            return self.__m2X509Stack.push(x509Cert)
537       
538        elif isinstance(x509Cert, X509Cert):
539            return self.__m2X509Stack.push(x509Cert.m2CryptoX509)
540       
541        elif isinstance(x509Cert, basestring):
542            return self.__m2X509Stack.push(\
543                                       X509CertParse(x509Cert).m2CryptoX509)           
544        else:
545            raise X509StackError("Expecting M2Crypto.X509.X509, ndg.security."
546                                 "common.X509.X509Cert or string type")
547               
548    def pop(self):
549        """Pop a certificate from the stack.
550       
551        @return: X509 object that was popped, or None if there is nothing
552        to pop.
553        @rtype: ndg.security.common.X509.X509Cert
554        """
555        return X509Cert(m2CryptoX509=self.__m2X509Stack.pop())
556
557
558    def asDER(self):
559        """Return the stack as a DER encoded string
560        @return: DER string
561        @rtype: string"""
562        return self.__m2X509Stack.as_der()
563
564
565    def verifyCertChain(self, 
566                        x509Cert2Verify=None, 
567                        caX509Stack=[],
568                        rejectSelfSignedCert=True):
569        """Treat stack as a list of certificates in a chain of
570        trust.  Validate the signatures through to a single root issuer. 
571
572        @param x509Cert2Verify: X.509 certificate to be verified default is
573        last in the stack
574        @type x509Cert2Verify: X509Cert
575       
576        @param caX509Stack: X.509 stack containing CA certificates that are
577        trusted.
578        @type caX509Stack: X509Stack
579       
580        @param rejectSelfSignedCert: Set to True (default) to raise an
581        SelfSignedCert exception if a certificate in self's stack is
582        self-signed. 
583        @type rejectSelfSignedCert: bool"""
584       
585        n2Validate = len(self)
586        if x509Cert2Verify:
587            # One more to validate in addition to stack content
588            n2Validate += 1
589        else:
590            # Validate starting from last on stack - but check first that it's
591            # populated
592            if n2Validate == 0:
593                raise X509StackEmptyError("Empty stack and no x509Cert2Verify "
594                                          "set: no cert.s to verify")
595
596            x509Cert2Verify = self[-1]
597             
598               
599        # Exit loop if all certs have been validated or if find a self
600        # signed cert.
601        nValidated = 0
602        issuerX509Cert = None
603        while nValidated < n2Validate:               
604            issuerX509Cert = None
605            issuerDN = x509Cert2Verify.issuer
606           
607            # Search for issuing certificate in stack
608            for x509Cert in self:
609                if x509Cert.dn == issuerDN:
610                    # Match found - the cert.'s issuer has been found in the
611                    # stack
612                    issuerX509Cert = x509Cert
613                    break
614                   
615            if issuerX509Cert:
616                # An issuing cert. has been found - use it to check the
617                # signature of the cert. to be verified
618                if not x509Cert2Verify.verify(issuerX509Cert.pubKey):
619                    X509CertInvalidSignature('Signature is invalid for cert. '
620                                             '"%s"' % x509Cert2Verify.dn)
621               
622                # In the next iteration the issuer cert. will be checked:
623                # 1) search for a cert. in the stack that issued it
624                # 2) If found use the issuing cert. to verify
625                x509Cert2Verify = issuerX509Cert
626                nValidated += 1
627            else:
628                # All certs in the stack have been searched
629                break
630
631
632        if issuerX509Cert:           
633            # Check for self-signed certificate
634            if nValidated == 1 and rejectSelfSignedCert and \
635               issuerX509Cert.dn == issuerX509Cert.issuer:
636
637                # If only one iteration occurred then it must be a self
638                # signed certificate
639                raise SelfSignedCert("Certificate is self signed: [DN=%s]" %
640                                     issuerX509Cert.dn)
641           
642            if not caX509Stack:
643                caX509Stack = [issuerX509Cert]
644                         
645        elif not caX509Stack:
646            raise X509CertIssuerNotFound('No issuer cert. found for cert. '
647                                         '"%s"' % x509Cert2Verify.dn)
648           
649        for caCert in caX509Stack:
650            issuerDN = x509Cert2Verify.issuer
651            if caCert.dn == issuerDN:
652                issuerX509Cert = caCert
653                break
654       
655        if issuerX509Cert:   
656            if not x509Cert2Verify.verify(issuerX509Cert.pubKey):
657                X509CertInvalidSignature('Signature is invalid for cert. "%s"'%
658                                         x509Cert2Verify.dn)
659           
660            # Chain is validated through to CA cert
661            return
662        else:
663            raise X509CertIssuerNotFound('No issuer cert. found for '
664                                         'certificate "%s"'%x509Cert2Verify.dn)
665       
666        # If this point is reached then an issuing cert is missing from the
667        # chain       
668        raise X509CertIssuerNotFound('Can\'t find issuer cert "%s" for '
669                                     'certificate "%s"' %
670                                     (x509Cert2Verify.issuer, 
671                                      x509Cert2Verify.dn))
672
673
674def X509StackParseFromDER(derString):
675    """Make a new stack from a DER string
676   
677    @param derString: DER formatted X.509 stack data
678    @type derString: string
679    @return: new stack object
680    @rtype: X509Stack""" 
681    return X509Stack(m2X509Stack=M2Crypto.X509.new_stack_from_der(derString))
682
683
684class X500DNError(Exception):
685    """Exception handling for NDG X.500 DN class."""
686
687
688# For use with parseSeparator method:
689import re
690
691
692class X500DN(dict):
693    "NDG X500 Distinguished name"
694   
695    # Class attribute - look-up mapping short name attributes to their long
696    # name equivalents
697    # * private *
698    __shortNameLUT = {
699        'commonName':               'CN',
700        'organisationalUnitName':   'OU',
701        'organisation':             'O',
702        'countryName':              'C',
703        'emailAddress':             'EMAILADDRESS',
704        'localityName':             'L',
705        'stateOrProvinceName':      'ST',
706        'streetAddress':            'STREET',
707        'domainComponent':              'DC',
708        'userid':                       'UID'
709    }
710
711   
712    def __init__(self, dn=None, m2CryptoX509Name=None, separator=None):
713
714        """Create a new X500 Distinguished Name
715
716        @type m2CryptoX509Name: M2Crypto.X509.X509_Name
717        @param m2CryptoX509Name:   initialise using using an
718        M2Crypto.X509.X509_Name
719        @type dn: basestring
720        @param dn: initialise using a distinguished name string
721        @type separator: basestring
722        @param: separator: separator used to delimit dn fields - usually '/'
723        or ','.  If dn is input and separator is omitted the separator
724        character will be automatically parsed from the dn string.
725        """
726       
727        # Private key data
728        self.__dat = {}.fromkeys(X500DN.__shortNameLUT.values(), '')
729   
730        dict.__init__(self)
731   
732        self.__separator = None
733       
734        # Check for separator from input
735        if separator is not None:
736            if not isinstance(separator, basestring):
737                raise X500DNError("dn Separator must be a valid string")
738
739            # Check for single character but allow trailing space chars
740            if len(separator.lstrip()) is not 1:
741                raise X500DNError("dn separator must be a single character")
742
743            self.__separator = separator
744           
745        if m2CryptoX509Name is not None:
746            # the argument is an x509 dn in m2crypto format
747            self.deserialise(str(m2CryptoX509Name))
748           
749        elif dn is not None:
750            # Separator can be parsed from the input DN string - only attempt
751            # if no explict separator was input
752            if self.__separator is None:
753                self.__separator = self.parseSeparator(dn)
754               
755            # Split Distinguished name string into constituent fields
756            self.deserialise(dn)
757
758    @classmethod
759    def fromString(cls, dn):
760        """Convenience method for parsing DN string into a new instance
761        """
762        return cls(dn=dn)
763   
764    def __repr__(self):
765        """Override default behaviour to return internal dictionary content"""
766        return self.serialise()
767
768    def __str__(self):
769        """Behaviour for print and string statements - convert DN into
770        serialised format."""
771        return self.serialise()
772       
773    def __eq__(self, x500dn):
774        """Return true if the all the fields of the two DNs are equal"""
775       
776        if not isinstance(x500dn, X500DN):
777            return False
778
779        return self.__dat.items() == x500dn.items()
780   
781    def __ne__(self, x500dn):
782        """Return true if the all the fields of the two DNs are equal"""
783       
784        if not isinstance(x500dn, X500DN):
785            return False
786
787        return self.__dat.items() != x500dn.items()
788 
789    def __delitem__(self, key):
790        """Prevent keys from being deleted."""
791        raise X500DNError('Keys cannot be deleted from the X500DN')
792
793    def __getitem__(self, key):
794
795        # Check input key
796        if self.__dat.has_key(key):
797
798            # key recognised
799            return self.__dat[key]
800       
801        elif X500DN.__shortNameLUT.has_key(key):
802
803            # key not recognised - but a long name version of the key may
804            # have been passed
805            shortName = X500DN.__shortNameLUT[key]
806            return self.__dat[shortName]
807
808        else:
809            # key not recognised as a short or long name version
810            raise KeyError('Key "' + key + '" not recognised for X500DN')
811
812    def __setitem__(self, key, item):
813       
814        # Check input key
815        if self.__dat.has_key(key):
816
817            # key recognised
818            self.__dat[key] = item
819           
820        elif X500DN.__shortNameLUT.has_key(key):
821               
822            # key not recognised - but a long name version of the key may
823            # have been passed
824            shortName = X500DN.__shortNameLUT[key]
825            self.__dat[shortName] = item
826           
827        else:
828            # key not recognised as a short or long name version
829            raise KeyError('Key "' + key + '" not recognised for X500DN')
830
831    def clear(self):
832        raise X500DNError("Data cannot be cleared from X500DN")
833
834    def copy(self):
835
836        import copy
837        return copy.copy(self)
838
839    def keys(self):
840        return self.__dat.keys()
841
842    def items(self):
843        return self.__dat.items()
844
845    def values(self):
846        return self.__dat.values()
847
848    def has_key(self, key):
849        return self.__dat.has_key(key)
850
851    # 'in' operator
852    def __contains__(self, key):
853        return key in self.__tags
854
855    def get(self, *arg):
856        return self.__dat.get(*arg)
857 
858    def serialise(self, separator=None):
859        """Combine fields in Distinguished Name into a single string."""
860       
861        if separator:
862            if not isinstance(separator, basestring):
863                raise X500DNError("Separator must be a valid string")
864               
865            self.__separator = separator
866           
867        else:
868            # Default to / if no separator is set
869            separator = '/'
870
871
872        # If using '/' then prepend DN with an initial '/' char
873        if separator == '/':
874            sDN = separator
875        else:
876            sDN = ''
877     
878        dnList = []
879        for (key, val) in self.__dat.items():
880            if val:
881                if isinstance(val, tuple):
882                    dnList += [separator.join(["%s=%s" % (key, valSub) \
883                                               for valSub in val])]
884                else:
885                    dnList += ["%s=%s" % (key, val)]
886               
887        sDN += separator.join(dnList)
888                               
889        return sDN
890
891    serialize = serialise
892   
893    def deserialise(self, dn, separator=None):
894        """Break up a DN string into it's constituent fields and use to
895        update the object's dictionary"""
896       
897        if separator:
898            if not isinstance(separator, basestring):
899                raise X500DNError("Separator must be a valid string")
900
901            self.__separator = separator
902
903
904        # If no separator has been set, parse if from the DN string           
905        if self.__separator is None:
906            self.__separator = self.parseSeparator(dn)
907
908        try:
909            dnFields = dn.split(self.__separator)
910            if len(dnFields) < 2:
911                raise X500DNError("Error parsing DN string: \"%s\"" % dn)
912
913           
914            # Split fields into key/value and also filter null fields if
915            # found e.g. a leading '/' in the DN would yield a null field
916            # when split
917           
918            items = [field.split('=') for field in dnFields if field]
919
920            # Reset existing dictionary values
921            self.__dat.fromkeys(self.__dat, '')
922           
923            # Strip leading and trailing space chars and convert into a
924            # dictionary
925            parsedDN = {}
926            for (key, val) in items:
927                key = key.strip()
928                if key in parsedDN:
929                    if isinstance(parsedDN[key], tuple):
930                        parsedDN[key] = tuple(list(parsedDN[key]) + [val])                   
931                    else:
932                        parsedDN[key] = (parsedDN[key], val)
933                else:
934                    parsedDN[key] = val
935               
936            # Copy matching DN fields
937            for key, val in parsedDN.items():
938                if key not in self.__dat and key not in self.__shortNameLUT:
939                    raise X500DNError('Invalid field "%s" in input DN string' %
940                                      key)
941
942                self.__dat[key] = val
943
944               
945        except Exception, excep:
946            raise X500DNError("Error de-serialising DN \"%s\": %s" % \
947                              (dn, str(excep)))
948
949    deserialize = deserialise
950   
951    def parseSeparator(self, dn):
952        """Attempt to parse the separator character from a given input
953        DN string.  If not found, return None
954
955        DNs don't use standard separators e.g.
956
957        /C=UK/O=eScience/OU=CLRC/L=DL/CN=AN Other
958        CN=SUM Oneelse,L=Didcot, O=RAL,OU=SSTD
959
960        This function isolates and identifies the character.  - In the above,
961        '/' and ',' respectively"""
962
963
964        # Make a regular expression containing all the possible field
965        # identifiers with equal sign appended and 'or'ed together.  \W should
966        # match the separator which preceeds the field name. \s* allows any
967        # whitespace between field name and field separator to be taken into
968        # account.
969        #
970        # The resulting match should be a list.  The first character in each
971        # element in the list should be the field separator and should be the
972        # same
973        regExpr = '|'.join(['\W\s*'+i+'=' for i in self.__dat.keys()])
974        match = re.findall(regExpr, dn)
975           
976        # In the first example above, the resulting match is:
977        # ['/C=', '/O=', '/OU=', '/L=']
978        # In each element the first character is the separator
979        sepList = [i[0:1] for i in match]
980
981        # All separators should be the same character - return None if they
982        # don't match
983        if not [i for i in sepList if i != sepList[0]]:
984            return sepList[0]
985        else:
986            return None
987
988    @classmethod
989    def Parse(cls, dn):
990        """Convenience method to create an X500DN object from a DN string
991        @type dn: basestring
992        @param dn: Distinguished Name
993        """
994        return cls(dn=dn)
995   
996    Deserialise = Deserialize = Parse
Note: See TracBrowser for help on using the repository browser.