source: exist/trunk/python/ndgUtils/lib/existdbclient.py @ 4590

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/exist/trunk/python/ndgUtils/lib/existdbclient.py@4590
Revision 4590, 20.0 KB checked in by cbyrom, 11 years ago (diff)

Add new class to allow eXist doc backups to be done in a separate,
asynchronous thread + add debug keyword to allow more detailed output
from schema validation.

Line 
1'''
2 Class supporting set up and communication with eXist DB
3 for the purposes of creating and updating atoms
4 
5 @author: C Byrom - Tessella 08
6'''
7import os, sys, logging, datetime
8from ndgUtils.eXistConnector import eXistConnector as ec
9from ndgUtils.ndgXqueries import ndgXqueries
10from ndgUtils import DocumentRetrieve as DR
11import uuid
12
13try:
14    from xml.etree import ElementTree as ET
15except ImportError:
16    try:
17        import ElementTree as ET
18    except ImportError:
19        import elementtree.ElementTree as ET
20
21from threading import Thread
22
23class backingUpThread(Thread):
24   
25   def __init__ (self, existClient, doc, collection, fileName):
26       logging.info("Setting up thread to run backup for file, '%s'" %fileName)
27       Thread.__init__(self)
28       self.ec = existClient
29       self.doc = doc
30       self.collection = collection
31       self.fileName = fileName
32       logging.info("- finished setting up thread")
33     
34   def run(self):
35       logging.info("Running thread to perform backup of file, '%s'" %self.fileName)
36       self.ec.createEXistFile(self.doc, self.collection, self.fileName)
37       logging.info("- finished backing up file")
38       
39
40class eXistDBClient:
41   
42    def __init__(self, configFile = None, eXistDBHostname = None, \
43                 loadCollectionData=False, setUpDB = False):
44        '''
45        Initialise a connection to the eXistDB
46        @keyword configFile: config file to use in setting up DB
47        @keyword existDBHostname: name of eXist DB to use - if not specified, the first
48        host in the config file is used
49        @keyword loadCollectionData: preload info on all the eXist collections, if True (default False)
50        @keyword setUpDB: if True, create the basic collection structure and ingest the
51        atom schemas.  Default is False.
52        '''
53        logging.info("Initialising connection to eXist DB")
54        self.eXistDBHostname = eXistDBHostname
55        logging.debug("- connecting to DB, '%s', with config file, '%s'" \
56                      %(eXistDBHostname or 'Default', configFile or 'Default'))
57        inputs = {}
58       
59        self.atomSchema = None
60        # NB, there are two routes through here: if a config file is specified
61        # without a hostname, the host will be taken to be the first entry in
62        # the config file; if a hostname is specified, it will be used explicitly
63        if configFile:
64            inputs['pwfile'] = configFile
65            if not self.eXistDBHostname:
66                self.__loadDBDetails(configFile)
67           
68           
69        # Now set up the connection
70        logging.debug(inputs)
71        self.xmldb = DR(self.eXistDBHostname, **inputs)
72       
73        if setUpDB:
74            # set up any collections required - NB, if these already exist they won't cause any files to be lost
75            self.__setUpEXistAtomCollections()
76           
77            # add the schema required for atom validation
78            self.__addAtomSchema()
79       
80        self.collections = None
81        if loadCollectionData:
82            self.collections = self.getAllAtomCollections()
83           
84        logging.info("eXist DB connection initialised")
85
86
87    def __getSchema(self):
88        logging.debug("Getting atom schema data")
89        if not self.atomSchema:
90            self.atomSchema = ec.BASE_COLLECTION_PATH + \
91                ndgXqueries.ATOM_MOLES_SCHEMA  + '.xsd'
92
93        return self.atomSchema
94
95    AtomSchema = property(fget=__getSchema, doc="Atom schema path")
96
97
98    def createCollections(self, collections):
99        '''
100        Create the specified collections in eXist
101        @param collections: array of collections to create
102        @return True if successful
103        '''
104        logging.info("Setting up eXist collections")
105        for col in collections:
106            logging.debug("Creating collection, '%s'" %col)
107            self.xmldb.createCollection(col)
108        logging.info("All collections set up")
109
110
111    def getAtom(self, id):
112        '''
113        Lookup the atom with id
114        @param id: id of the atom to retrieve
115        '''
116        logging.info("Looking up atom with id, '%s'" %(id))
117        doc = self.xmldb.get('', DR.ATOM, id, \
118                             targetCollection = ec.BASE_COLLECTION_PATH)
119        logging.info("Atom retrieved")
120        return doc
121       
122
123    def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False):
124        '''
125        Validate the specified atom in eXist with the atom schemae in eXist
126        @param atomPath: path to the atom in eXist
127        @keyword atom: if set to an atom, this will be created temporarily in eXist
128        - since it may not already exist there.  Once validation is completed, the
129        file will be removed from eXist.
130        @keyword isDebug: if True, return full error details, otherwise only return
131        a summary
132        @return array: containing any errors found - NB, if an empty array is returned,
133        this indicates successful validation
134        '''
135        logging.info("Validating atom, '%s' against schemae in eXist" %atomPath)
136       
137        if atom:
138            logging.info("Creating temporary file in eXist to do validation against")
139            fileName = atom.datasetID + str(datetime.datetime.today().microsecond)
140            self.createEXistFile(atom.toPrettyXML(), \
141                                 atom.getDefaultCollectionPath(), fileName)
142            atomPath = atom.getDefaultCollectionPath() + fileName
143           
144        validationQuery = 'validation:validate-report("' + atomPath + \
145            '", xs:anyURI("' + self.AtomSchema + '"))'
146        id, result = self.xmldb.executeQuery(validationQuery)
147        errorMessage = None
148        if result['hits'] == 0: 
149            errorMessage = "Validation did not complete successfully - please retry"
150        elif result['hits'] > 1:
151            errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry"
152
153        if atom:
154            logging.info("Deleting temporary file in eXist")
155            self.deleteEXistFile(atomPath)
156
157        if errorMessage:
158            logging.error(errorMessage)
159            raise SystemError(errorMessage)
160       
161        doc = self.xmldb.retrieve(id, 0)
162        et = ET.fromstring(doc)
163        status = et.findtext('status')
164       
165        # retrieve the error detail if invalid
166        errors = []
167        if status == 'invalid':
168            logging.info("Atom is invalid - details as follows:")
169            for error in et.findall('message'):
170                lineNo = error.attrib.get('line')
171                colNo = error.attrib.get('column')
172                level = error.attrib.get('level')
173                repeat = error.attrib.get('repeat')
174                errorText = error.text
175                # remove the meaningless error type from message
176                if errorText.startswith('cvc-'):
177                    errorText = ':'.join(errorText.split(':')[1:])
178                errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText)
179                if repeat:
180                    errorMessage += " (%s times)" %repeat
181
182                if isDebug:
183                    errors.append(errorMessage)
184                else:
185                    errors.append(errorText)
186                logging.info(errorMessage)
187        else:
188            logging.info("Atom is valid")
189           
190        logging.info("Validation complete")
191        return errors
192   
193
194    def __setUpEXistAtomCollections(self):
195        '''
196        Set up the required eXist collections needed for running the granulator script
197        '''
198        logging.info("Ensuring required collections are available in eXist")
199        for col in [ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH]:
200            for type in [ec.OLD_COLLECTION_PATH, ec.PUBLISHED_COLLECTION_PATH, \
201                         ec.SMALL_P_PUBLISHED_COLLECTION_PATH, ec.WORKING_COLLECTION_PATH]:
202                self.xmldb.createCollection(col)
203                self.xmldb.createCollection(col + type)
204                self.xmldb.createCollection(col + type + ec.DE_COLLECTION_PATH)
205                self.xmldb.createCollection(col + type + ec.DEPLOYMENT_COLLECTION_PATH)
206                self.xmldb.createCollection(col + type + ec.DEPLOYMENTS_COLLECTION_PATH)
207                self.xmldb.createCollection(col + type + ec.GRANULE_COLLECTION_PATH)
208        logging.info("Required collections available")
209       
210
211    def __addAtomSchema(self):
212        '''
213        Add the required atom schema to the atoms collection - to allow validation
214        of input atoms
215        '''
216        logging.info("Adding atom schema to eXist")
217        xq = ndgXqueries()
218        schemae = [xq.ATOM_SCHEMA, xq.MOLES_SCHEMA, xq.ATOM_MOLES_SCHEMA]
219        for schema in schemae:
220            xml = xq.getSchema(schema)
221            self.createEXistFile(xml, ec.BASE_COLLECTION_PATH, schema + '.xsd')
222        logging.info("- schema added")
223       
224
225    def __loadDBDetails(self, configFile):
226        '''
227        Retrieve info from the eXist db config file
228        '''
229        logging.info("Loading DB config data")
230        # Check this file exists
231        if not os.path.isfile(configFile):
232            errorMessage = "Could not find the DB config file, %s; please make sure this " \
233                     "is available from the running directory" %configFile
234            logging.error(errorMessage)
235            raise ValueError(errorMessage)
236        dbinfo_file=open(configFile, "r")
237        dbinfo = dbinfo_file.read().split()
238        if len(dbinfo) < 3:
239            errorMessage = 'Incorrect data in DB config file'
240            logging.error(errorMessage)
241            raise ValueError(errorMessage)
242        self.eXistDBHostname = dbinfo[0]
243        self._username = dbinfo[1]
244        self._pw = dbinfo[2]
245        logging.info("DB config data loaded")
246
247
248    def __lookupEXistFile(self, docPath):
249        '''
250        Look up a file in eXist using XPath
251        @param docPath: path to doc to look up
252        @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None
253        '''
254        logging.info("Retrieving info for file, '%s'" %docPath)
255       
256        id, doc = self.xmldb.executeQuery('doc("' + docPath + '")')
257       
258        if doc['hits'] == 0:
259            logging.info("File does not exist in eXist DB")
260            return None
261        logging.info("Found file - returning result ID")
262        return id
263         
264
265    def getEXistFile(self, docPath):
266        '''
267        Use XQuery to retrieve the specified document from eXist
268        @param docPath: the path of the doc to retrieve
269        @return: contents of document if exists, None otherwise
270        '''
271        id = self.__lookupEXistFile(docPath)
272       
273        if not id and id != 0:
274            logging.info("No file found - nothing to retrieve")
275            return None
276       
277        logging.info("Found file - now retrieving content")
278        doc = self.xmldb.retrieve(id, 0)
279        return doc
280
281
282    def isNewEXistFile(self, docPath):
283        '''
284        Test if a file already exists in eXist
285        @param docPath: path of file in eXist to look up
286        @return: True if a new file, False if otherwise
287        '''
288        logging.info("Checking if file, '%s', exists in eXist DB" %docPath)
289       
290        id = self.__lookupEXistFile(docPath)
291
292        if id:
293            return False
294       
295        return True
296
297
298    def __addTimeStamp(self, fileName):
299        '''
300        Add timestamp to input filename
301        NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp
302        is included before this; if not it is just added at the end
303        '''
304        bits = fileName.rsplit(".", 1)
305        fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S")
306       
307        if len(bits) > 1:
308            fileName += "." + bits[1]
309        return fileName
310
311
312    def backupEXistFile(self, collection, fileName):
313        '''
314        Backup a file that exists in the eXist DB
315        - NB, this really just creates a new file with the same contents in a
316        backup dir
317        - to improve efficiency, spawn this process as a new thread since we
318        don't need to worry about the outcome
319        @param collection: path of the collection to store the file in
320        @param fileName: name of file to add in eXist
321        @return: path to new backup file
322        '''
323        if not collection.endswith('/'):
324            collection += '/'
325           
326        docPath = collection + fileName
327        logging.info("Backing up file, '%s', in eXist DB" %docPath)
328
329        logging.debug("Firstly, retrieve file contents from eXist")
330        doc = self.getEXistFile(docPath)
331        if not doc:
332            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
333            logging.error(errorMessage)
334            raise SystemError(errorMessage)
335       
336        # Now adjust the collection to map to the backup dir
337        collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH)
338        collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP)
339       
340        # add timestamp to filename
341        fileName = self.__addTimeStamp(fileName)
342        docPath = collection + fileName
343       
344        # run the back up in a separate thread
345        thread = backingUpThread(self, doc, collection, fileName)
346        thread.start()
347
348        return docPath
349
350
351    def createEXistFile(self, xml, collection, fileName):
352        '''
353        Add the input file to the eXist DB
354        @param xml: contents of xml file to create in eXist
355        @param collection: path of the collection to store the file in
356        @param fileName: name of file to add in eXist
357        @return: True, if file created successfully
358        '''
359        logging.info("Adding file, '%s' to eXist DB collection, '%s'" \
360                     %(fileName, collection))
361        logging.debug("data: %s" %xml)
362
363        # create the collection, in case it doesn't already exist - NB, this won't overwrite anything
364        self.createCollections([collection])
365        status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1)   
366        if not status:
367            errorMessage = "Command to create file in eXist did not complete successfully - exiting"
368            logging.error(errorMessage)
369            raise SystemError(errorMessage)
370       
371        logging.info("File added to eXist")
372        return True
373
374
375    def deleteEXistFile(self, docPath):
376        '''
377        Delete the input file from eXist DB
378        @param docPath: path of document to delete
379        @return: True, if file deleted successfully
380        '''
381        logging.info("Deleting file, '%s', from eXist DB" %docPath)
382
383        status = self.xmldb.removeDoc(docPath)   
384        if not status:
385            errorMessage = "Command to delete file in eXist did not complete successfully - exiting"
386            logging.error(errorMessage)
387            raise SystemError(errorMessage)
388       
389        logging.info("File deleted from eXist")
390        return True
391
392
393    def createOrUpdateEXistFile(self, xml, collection, fileName):
394        '''
395        Check if a file already exists in eXist; if it does, run an
396        update (which will backup the existing file), otherwise create
397        the file in eXist
398        @param xml: contents of xml file to create/update in eXist
399        @param collection: path of the collection to store the file in
400        @param fileName: name of file to add in eXist
401        '''
402        logging.info("Creating or updating file in eXist...")
403        if not self.isNewEXistFile(collection + fileName):
404            self.backupEXistFile(collection, fileName)
405           
406        self.createEXistFile(xml, collection, fileName)
407
408
409    def getAllAtomIDs(self):
410        '''
411        Retrieve all the atom IDs in the atoms directory - NB, this can
412        be a quick way of producing a cache of data to check - e.g. to avoid
413        multiple calls to getAtomFileCollectionPath
414        @return: ids - array of all atom IDs
415        '''
416        logging.info("Retrieving all atom ids")
417        xq = ndgXqueries().actual('atomList', '/db/atoms', '', '')
418        id, doc = self.xmldb.executeQuery(xq)
419        if doc['hits'] == 0: 
420            return []
421       
422        indices = range(doc['hits'])
423       
424        doc = self.xmldb.retrieve(id, 0)
425        et = ET.fromstring(doc)
426        ids = []
427        for member in et:
428            fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID')
429            ids.append(fn)
430        logging.debug("Found ids, '%s'" %ids)
431        return ids
432
433
434    def getAllAtomCollections(self):
435        '''
436        Get all atom collection paths and store in a dictionary - for easy
437        reference when doing lots of things at once
438        @return: dict with key/val of atomID/collectionPath
439        '''
440        logging.info("Retrieving all atom collection paths")
441        xq = ndgXqueries().actual('atomList', '/db/atoms', '', '')
442        id, doc = self.xmldb.executeQuery(xq)
443        if doc['hits'] == 0: 
444            return []
445       
446        indices = range(doc['hits'])
447       
448        doc = self.xmldb.retrieve(id, 0)
449        et = ET.fromstring(doc)
450        colData = {}
451        for member in et:
452            collection = member.findtext('{http://www.w3.org/2005/Atom}fileName')
453            fileName = collection.split('/')[-1]
454            fileName = fileName.split('.')[0]
455            dir = '/'.join(collection.split('/')[0:-1])
456            colData[fileName] = dir
457
458        logging.debug("Finished looking up atom paths")
459        return colData
460
461
462    def getAtomFileCollectionPath(self, atomID):
463        '''
464        Given an atom id, determine and return the collection path in eXist
465        of the associated atom file
466        @param atom: atom id to look up
467        @return: collection path, if it exists, None, otherwise
468        '''
469        logging.info("Looking up collection path for atom ID, '%s'" %atomID)
470        xq = ndgXqueries()['atomFullPath']
471        xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH)
472        xq = xq.replace('LocalID', atomID)
473
474        id, doc = self.xmldb.executeQuery(xq)
475        if doc['hits'] == 0:
476            logging.info("No document found with the specified ID")
477            return None
478
479        doc = self.xmldb.retrieve(id,0,{})
480
481        docET = ET.fromstring(doc)
482        collPath = docET.text + '/'
483        logging.debug("Found collection path, '%s'" %collPath)
484        return collPath
485       
486           
487    def createAtomInExist(self, atom):
488        '''
489        Create an atom in the eXist DB
490        @param atom: atom object to create in the DB
491        '''
492        logging.info("Creating atom in eXist")
493       
494        # if the atom has no dataset ID, generate and add one
495        # NB, this should only be the case when the atom is being created
496        # via the web interface
497        isNew = False
498        if not atom.datasetID:
499            isNew = True
500            atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1()))
501
502        eXistCollection = None
503        if self.collections is not None: # cope with empty dict
504            eXistCollection = self.collections.get(atom.datasetID)
505        else:
506            eXistCollection = self.getAtomFileCollectionPath(atom.datasetID)
507       
508        # if collection not found, assume we're dealing with a new atom; get its
509        # default collection
510        if not eXistCollection:
511            eXistCollection = atom.getDefaultCollectionPath()
512        elif isNew:
513            # in this situation we're trying to create an atom with the same
514            # name via the web interface - this can't be allowed - so retry to
515            # generate a new ID
516            atom.datasetID = None
517            self.createAtomInExist(atom)
518            return
519        # create backup of atom if it already exists
520        else:
521            self.backupEXistFile(eXistCollection, atom.atomName)
522           
523            # also change updated date to current time
524            atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ")
525           
526        self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName)
527        logging.info("Atom created in eXist")
528        return atom
Note: See TracBrowser for help on using the repository browser.