source: exist/trunk/python/ndgUtils/lib/existdbclient.py @ 4696

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/exist/trunk/python/ndgUtils/lib/existdbclient.py@4696
Revision 4696, 23.2 KB checked in by cbyrom, 11 years ago (diff)

Adjust Atom and MolesEntity? data models to properly use namespaces when
dealing with xpath queries - rather than having these stripped out. This
avoids problems when namespaces are given arbitrary names and is a more
exact, hence robust, approach.
Create new test class to put the xmlhandler2 tests separately in.
Add delete function to granulite - to allow data granules, and their
connections to data entities, to be removed + add 'roll back' functionality
to cope with scenarios when granulite replace/delete fails to complete
properly. Add new methods to the existdbclient to allow the restore/delete/backup
functionality.
Extend test suite to exercise new functionality.

Line 
1'''
2 Class supporting set up and communication with eXist DB
3 for the purposes of creating and updating atoms
4 
5 @author: C Byrom - Tessella 08
6'''
7import os, sys, logging, datetime, uuid, re
8from ndgUtils.models.Atom import Atom
9from ndgUtils.eXistConnector import eXistConnector as ec
10from ndgUtils.ndgXqueries import ndgXqueries
11from ndgUtils import DocumentRetrieve as DR
12
13try:
14    from xml.etree import ElementTree as ET
15except ImportError:
16    try:
17        import ElementTree as ET
18    except ImportError:
19        import elementtree.ElementTree as ET
20
21from threading import Thread
22
23
24class DuplicateError(Exception):
25    """
26    Exception handling for when a duplicated atom doc is discovered
27    """
28    def __init__(self, msg):
29        logging.error(msg)
30        Exception.__init__(self, msg)
31
32class backingUpThread(Thread):
33   
34   def __init__ (self, existClient, doc, collection, fileName):
35       logging.info("Setting up thread to run backup for file, '%s'" %fileName)
36       Thread.__init__(self)
37       self.ec = existClient
38       self.doc = doc
39       self.collection = collection
40       self.fileName = fileName
41       logging.info("- finished setting up thread")
42     
43   def run(self):
44       logging.info("Running thread to perform backup of file, '%s'" %self.fileName)
45       self.ec.createEXistFile(self.doc, self.collection, self.fileName)
46       logging.info("- finished backing up file")
47       
48
49class eXistDBClient:
50   
51    def __init__(self, configFile = None, eXistDBHostname = None, \
52                 loadCollectionData=False, setUpDB = False):
53        '''
54        Initialise a connection to the eXistDB
55        @keyword configFile: config file to use in setting up DB
56        @keyword existDBHostname: name of eXist DB to use - if not specified, the first
57        host in the config file is used
58        @keyword loadCollectionData: preload info on all the eXist collections, if True (default False)
59        @keyword setUpDB: if True, create the basic collection structure and ingest the
60        atom schemas.  Default is False.
61        '''
62        logging.info("Initialising connection to eXist DB")
63        self.eXistDBHostname = eXistDBHostname
64        logging.debug("- connecting to DB, '%s', with config file, '%s'" \
65                      %(eXistDBHostname or 'Default', configFile or 'Default'))
66        inputs = {}
67       
68        self.atomSchema = None
69       
70        # this keeps a record of the last file backed up - incase we need
71        # to do a rollback if there's an error encountered
72        self.backupName = None
73       
74        # NB, there are two routes through here: if a config file is specified
75        # without a hostname, the host will be taken to be the first entry in
76        # the config file; if a hostname is specified, it will be used explicitly
77        if configFile:
78            inputs['pwfile'] = configFile
79            if not self.eXistDBHostname:
80                self.__loadDBDetails(configFile)
81           
82        # Now set up the connection
83        logging.debug(inputs)
84        self.xmldb = DR(self.eXistDBHostname, **inputs)
85       
86        if setUpDB:
87            # set up any collections required - NB, if these already exist they won't cause any files to be lost
88            self.__setUpEXistAtomCollections()
89           
90            # add the schema required for atom validation
91            self.__addAtomSchema()
92       
93        self.collections = None
94        if loadCollectionData:
95            self.collections = self.getAllAtomCollections()
96           
97        logging.info("eXist DB connection initialised")
98
99
100    def __getSchema(self):
101        logging.debug("Getting atom schema data")
102        if not self.atomSchema:
103            self.atomSchema = ec.BASE_COLLECTION_PATH + \
104                ndgXqueries.ATOM_MOLES_SCHEMA  + '.xsd'
105
106        return self.atomSchema
107
108    AtomSchema = property(fget=__getSchema, doc="Atom schema path")
109
110
111    def createCollections(self, collections):
112        '''
113        Create the specified collections in eXist
114        @param collections: array of collections to create
115        @return True if successful
116        '''
117        logging.info("Setting up eXist collections")
118        for col in collections:
119            logging.debug("Creating collection, '%s'" %col)
120            self.xmldb.createCollection(col)
121        logging.info("All collections set up")
122
123
124    def getAtom(self, id):
125        '''
126        Lookup the atom with id
127        @param id: id of the atom to retrieve
128        '''
129        logging.info("Looking up atom with id, '%s'" %(id))
130        doc = self.xmldb.get('', DR.ATOM, id, \
131                             targetCollection = ec.BASE_COLLECTION_PATH)
132        logging.info("Atom retrieved")
133        return doc
134       
135
136    def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False):
137        '''
138        Validate the specified atom in eXist with the atom schemae in eXist
139        @param atomPath: path to the atom in eXist
140        @keyword atom: if set to an atom, this will be created temporarily in eXist
141        - since it may not already exist there.  Once validation is completed, the
142        file will be removed from eXist.
143        @keyword isDebug: if True, return full error details, otherwise only return
144        a summary
145        @return array: containing any errors found - NB, if an empty array is returned,
146        this indicates successful validation
147        '''
148        logging.info("Validating atom, '%s' against schemae in eXist" %atomPath)
149       
150        if atom:
151            logging.info("Creating temporary file in eXist to do validation against")
152            fileName = atom.datasetID + str(datetime.datetime.today().microsecond)
153            self.createEXistFile(atom.toPrettyXML(), \
154                                 atom.getDefaultCollectionPath(), fileName)
155            atomPath = atom.getDefaultCollectionPath() + fileName
156           
157        validationQuery = 'validation:validate-report("' + atomPath + \
158            '", xs:anyURI("' + self.AtomSchema + '"))'
159        id, result = self.xmldb.executeQuery(validationQuery)
160        errorMessage = None
161        if result['hits'] == 0: 
162            errorMessage = "Validation did not complete successfully - please retry"
163        elif result['hits'] > 1:
164            errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry"
165
166        if atom:
167            logging.info("Deleting temporary file in eXist")
168            self.deleteEXistFile(atomPath)
169
170        if errorMessage:
171            logging.error(errorMessage)
172            raise SystemError(errorMessage)
173       
174        doc = self.xmldb.retrieve(id, 0)
175        et = ET.fromstring(doc)
176        status = et.findtext('status')
177       
178        # retrieve the error detail if invalid
179        errors = []
180        if status == 'invalid':
181            logging.info("Atom is invalid - details as follows:")
182            for error in et.findall('message'):
183                lineNo = error.attrib.get('line')
184                colNo = error.attrib.get('column')
185                level = error.attrib.get('level')
186                repeat = error.attrib.get('repeat')
187                errorText = error.text
188                # remove the meaningless error type from message
189                if errorText.startswith('cvc-'):
190                    errorText = ':'.join(errorText.split(':')[1:])
191                errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText)
192                if repeat:
193                    errorMessage += " (%s times)" %repeat
194
195                if isDebug:
196                    errors.append(errorMessage)
197                else:
198                    errors.append(errorText)
199                logging.info(errorMessage)
200        else:
201            logging.info("Atom is valid")
202           
203        logging.info("Validation complete")
204        return errors
205   
206
207    def __setUpEXistAtomCollections(self):
208        '''
209        Set up the required eXist collections needed for running the granulator script
210        '''
211        logging.info("Ensuring required collections are available in eXist")
212        for col in [ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH]:
213            for type in [ec.OLD_COLLECTION_PATH, ec.PUBLISHED_COLLECTION_PATH, \
214                         ec.SMALL_P_PUBLISHED_COLLECTION_PATH, ec.WORKING_COLLECTION_PATH]:
215                self.xmldb.createCollection(col)
216                self.xmldb.createCollection(col + type)
217                self.xmldb.createCollection(col + type + ec.DE_COLLECTION_PATH)
218                self.xmldb.createCollection(col + type + ec.DEPLOYMENT_COLLECTION_PATH)
219                self.xmldb.createCollection(col + type + ec.DEPLOYMENTS_COLLECTION_PATH)
220                self.xmldb.createCollection(col + type + ec.GRANULE_COLLECTION_PATH)
221        logging.info("Required collections available")
222       
223
224    def __addAtomSchema(self):
225        '''
226        Add the required atom schema to the atoms collection - to allow validation
227        of input atoms
228        '''
229        logging.info("Adding atom schema to eXist")
230        xq = ndgXqueries()
231        schemae = [xq.ATOM_SCHEMA, xq.MOLES_SCHEMA, xq.ATOM_MOLES_SCHEMA]
232        for schema in schemae:
233            xml = xq.getSchema(schema)
234            self.createEXistFile(xml, ec.BASE_COLLECTION_PATH, schema + '.xsd')
235        logging.info("- schema added")
236       
237
238    def __loadDBDetails(self, configFile):
239        '''
240        Retrieve info from the eXist db config file
241        '''
242        logging.info("Loading DB config data")
243        # Check this file exists
244        if not os.path.isfile(configFile):
245            errorMessage = "Could not find the DB config file, %s; please make sure this " \
246                     "is available from the running directory" %configFile
247            logging.error(errorMessage)
248            raise ValueError(errorMessage)
249        dbinfo_file=open(configFile, "r")
250        dbinfo = dbinfo_file.read().split()
251        if len(dbinfo) < 3:
252            errorMessage = 'Incorrect data in DB config file'
253            logging.error(errorMessage)
254            raise ValueError(errorMessage)
255        self.eXistDBHostname = dbinfo[0]
256        self._username = dbinfo[1]
257        self._pw = dbinfo[2]
258        logging.info("DB config data loaded")
259
260
261    def __lookupEXistFile(self, docPath):
262        '''
263        Look up a file in eXist using XPath
264        @param docPath: path to doc to look up
265        @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None
266        '''
267        logging.info("Retrieving info for file, '%s'" %docPath)
268       
269        id, doc = self.xmldb.executeQuery('doc("' + docPath + '")')
270       
271        if doc['hits'] == 0:
272            logging.info("File does not exist in eXist DB")
273            return None
274        logging.info("Found file - returning result ID")
275        return id
276         
277
278    def getEXistFile(self, docPath):
279        '''
280        Use XQuery to retrieve the specified document from eXist
281        @param docPath: the path of the doc to retrieve
282        @return: contents of document if exists, None otherwise
283        '''
284        id = self.__lookupEXistFile(docPath)
285       
286        if not id and id != 0:
287            logging.info("No file found - nothing to retrieve")
288            return None
289       
290        logging.info("Found file - now retrieving content")
291        doc = self.xmldb.retrieve(id, 0)
292        return doc
293
294
295    def isNewEXistFile(self, docPath):
296        '''
297        Test if a file already exists in eXist
298        @param docPath: path of file in eXist to look up
299        @return: True if a new file, False if otherwise
300        '''
301        logging.info("Checking if file, '%s', exists in eXist DB" %docPath)
302       
303        id = self.__lookupEXistFile(docPath)
304
305        if id:
306            return False
307       
308        return True
309
310
311    def __addTimeStamp(self, fileName):
312        '''
313        Add timestamp to input filename
314        NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp
315        is included before this; if not it is just added at the end
316        '''
317        bits = fileName.rsplit(".", 1)
318        fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S")
319       
320        if len(bits) > 1:
321            fileName += "." + bits[1]
322        return fileName
323
324
325    def __removeTimeStamp(self, fileName):
326        '''
327        Remove a timestamp from a file name
328        '''
329        match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName)
330        if match:
331            return match.group(1) + match.group(3)
332
333        return fileName
334
335    def backupEXistFile(self, collection, fileName, runAsynch = True):
336        '''
337        Backup a file that exists in the eXist DB
338        - NB, this really just creates a new file with the same contents in a
339        backup dir
340        - to improve efficiency, spawn this process as a new thread since we
341        don't need to worry about the outcome
342        @param collection: path of the collection to store the file in
343        @param fileName: name of file to add in eXist
344        @param runAsynch: if True, do the backup asynchronously in a separate thread
345        @return: path to new backup file
346        '''
347        if not collection.endswith('/'):
348            collection += '/'
349           
350        docPath = collection + fileName
351        logging.info("Backing up file, '%s', in eXist DB" %docPath)
352
353        logging.debug("Firstly, retrieve file contents from eXist")
354        doc = self.getEXistFile(docPath)
355        if not doc:
356            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
357            logging.error(errorMessage)
358            raise SystemError(errorMessage)
359       
360        # Now adjust the collection to map to the backup dir
361        collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH)
362        collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP)
363       
364        # add timestamp to filename
365        fileName = self.__addTimeStamp(fileName)
366        docPath = collection + fileName
367       
368        if runAsynch:
369            # run the back up in a separate thread
370            thread = backingUpThread(self, doc, collection, fileName)
371            thread.start()
372        else:
373            self.createEXistFile(doc, collection, fileName)
374           
375        return docPath
376
377
378    def createEXistFile(self, xml, collection, fileName):
379        '''
380        Add the input file to the eXist DB
381        @param xml: contents of xml file to create in eXist
382        @param collection: path of the collection to store the file in
383        @param fileName: name of file to add in eXist
384        @return: True, if file created successfully
385        '''
386        logging.info("Adding file, '%s' to eXist DB collection, '%s'" \
387                     %(fileName, collection))
388        logging.debug("data: %s" %xml)
389
390        # create the collection, in case it doesn't already exist - NB, this won't overwrite anything
391        self.createCollections([collection])
392        status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1)   
393        if not status:
394            errorMessage = "Command to create file in eXist did not complete successfully - exiting"
395            logging.error(errorMessage)
396            raise SystemError(errorMessage)
397       
398        logging.info("File added to eXist")
399        return True
400
401
402    def deleteEXistFile(self, docPath):
403        '''
404        Delete the input file from eXist DB
405        @param docPath: path of document to delete
406        @return: True, if file deleted successfully
407        '''
408        logging.info("Deleting file, '%s', from eXist DB" %docPath)
409
410        status = self.xmldb.removeDoc(docPath)   
411        if not status:
412            errorMessage = "Command to delete file in eXist did not complete successfully - exiting"
413            logging.error(errorMessage)
414            raise SystemError(errorMessage)
415       
416        logging.info("File deleted from eXist")
417        return True
418
419
420    def createOrUpdateEXistFile(self, xml, collection, fileName):
421        '''
422        Check if a file already exists in eXist; if it does, run an
423        update (which will backup the existing file), otherwise create
424        the file in eXist
425        @param xml: contents of xml file to create/update in eXist
426        @param collection: path of the collection to store the file in
427        @param fileName: name of file to add in eXist
428        '''
429        logging.info("Creating or updating file in eXist...")
430        if not self.isNewEXistFile(collection + fileName):
431            self.backupEXistFile(collection, fileName)
432           
433        self.createEXistFile(xml, collection, fileName)
434
435
436    def getAllAtomIDs(self):
437        '''
438        Retrieve all the atom IDs in the atoms directory - NB, this can
439        be a quick way of producing a cache of data to check - e.g. to avoid
440        multiple calls to getAtomFileCollectionPath
441        @return: ids - array of all atom IDs
442        '''
443        logging.info("Retrieving all atom ids")
444        xq = ndgXqueries().actual('atomList', '/db/atoms', '', '')
445        id, doc = self.xmldb.executeQuery(xq)
446        if doc['hits'] == 0: 
447            return []
448       
449        indices = range(doc['hits'])
450       
451        doc = self.xmldb.retrieve(id, 0)
452        et = ET.fromstring(doc)
453        ids = []
454        for member in et:
455            fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID')
456            ids.append(fn)
457        logging.debug("Found ids, '%s'" %ids)
458        return ids
459
460
461    def getAllAtomCollections(self):
462        '''
463        Get all atom collection paths and store in a dictionary - for easy
464        reference when doing lots of things at once
465        @return: dict with key/val of atomID/collectionPath
466        '''
467        logging.info("Retrieving all atom collection paths")
468        xq = ndgXqueries().actual('atomList', '/db/atoms', '', '')
469        id, doc = self.xmldb.executeQuery(xq)
470        if doc['hits'] == 0: 
471            return []
472       
473        indices = range(doc['hits'])
474       
475        doc = self.xmldb.retrieve(id, 0)
476        et = ET.fromstring(doc)
477        colData = {}
478        for member in et:
479            collection = member.findtext('{http://www.w3.org/2005/Atom}fileName')
480            fileName = collection.split('/')[-1]
481            fileName = fileName.split('.')[0]
482            dir = '/'.join(collection.split('/')[0:-1])
483            colData[fileName] = dir
484
485        logging.debug("Finished looking up atom paths")
486        return colData
487
488
489    def getAtomFileCollectionPath(self, atomID):
490        '''
491        Given an atom id, determine and return the collection path in eXist
492        of the associated atom file
493        @param atom: atom id to look up
494        @return: collection path, if it exists, None, otherwise
495        '''
496        logging.info("Looking up collection path for atom ID, '%s'" %atomID)
497        xq = ndgXqueries()['atomFullPath']
498        xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH)
499        xq = xq.replace('LocalID', atomID)
500
501        id, doc = self.xmldb.executeQuery(xq)
502        if doc['hits'] == 0:
503            logging.info("No document found with the specified ID")
504            return None
505
506        doc = self.xmldb.retrieve(id,0,{})
507
508        docET = ET.fromstring(doc)
509        collPath = docET.text + '/'
510        logging.debug("Found collection path, '%s'" %collPath)
511        return collPath
512
513
514    def deleteAtomInExist(self, atom):
515        '''
516        Delete the given atom from the eXist DB - using the atom
517        details to work out the required path to delete
518        '''
519        logging.info("Deleting atom from eXist")
520        atomPath = atom.getDefaultCollectionPath() + atom.atomName
521        self.deleteEXistFile(atomPath)
522        logging.info("Atom deleted")
523
524           
525    def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True):
526        '''
527        Create an atom in the eXist DB - using the atom contents to work out
528        the location + data set ID
529        @param atom: atom object to create in the DB
530        @keyword replaceAtom: if False and the atom is already available in eXist
531        @param runAsynch: if True, if a backup of an existing file, do this
532        asynchronously in a separate thread
533        then raise a ValueError.
534       
535        '''
536        logging.info("Creating atom in eXist")
537        if not atom:
538            raise ValueError("Input is not an object - cannot create in eXist")
539        if not isinstance(atom, Atom):
540            raise ValueError("Input object is not an Atom object - cannot create in eXist")
541       
542        # if the atom has no dataset ID, generate and add one
543        # NB, this should only be the case when the atom is being created
544        # via the web interface
545        isNew = False
546        if not atom.datasetID:
547            isNew = True
548            atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1()))
549
550        eXistCollection = None
551        if self.collections is not None: # cope with empty dict
552            eXistCollection = self.collections.get(atom.datasetID)
553        else:
554            eXistCollection = self.getAtomFileCollectionPath(atom.datasetID)
555       
556        # if collection not found, assume we're dealing with a new atom; get its
557        # default collection
558        if not eXistCollection:
559            eXistCollection = atom.getDefaultCollectionPath()
560        elif isNew:
561            # in this situation we're trying to create an atom with the same
562            # name via the web interface - this can't be allowed - so retry to
563            # generate a new ID
564            atom.datasetID = None
565            self.createAtomInExist(atom)
566            return
567        # create backup of atom if it already exists
568        else:
569            if not replaceAtom:
570                raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \
571                                     %atom.datasetID)
572            # store name of backup - to allow restore, if subsequent ops fail
573            self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \
574                                                   runAsynch = runAsynch)
575           
576            # also change updated date to current time
577            atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ")
578           
579        self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName)
580        logging.info("Atom created in eXist")
581        return atom
582   
583   
584    def restoreBackup(self, docPath):
585        '''
586        Restore the backed up file - effectively recreating in the non-backup collection
587        @param docPath: path to file to backup
588        '''
589        logging.info("Restoring file, '%s' in eXist" %docPath)
590        doc = self.getEXistFile(docPath)
591       
592        if not doc:
593            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
594            logging.error(errorMessage)
595            raise SystemError(errorMessage)
596       
597        bits = docPath.split('/')
598        fileName = bits[-1]
599        collection = '/'.join(bits[0:-1])
600        # remove timestamp from filename
601        fileName = self.__removeTimeStamp(fileName)
602       
603        # Now adjust the collection to map to the backup dir
604        collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH)
605        collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH)
606       
607        self.createEXistFile(doc, collection, fileName)
608        logging.info("File restored")
609       
Note: See TracBrowser for help on using the repository browser.