source: ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py @ 4850

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py@4850
Revision 4850, 31.2 KB checked in by cbyrom, 11 years ago (diff)

Fix issue with atoms being deleted upon failed content validation +
adjust feed entry content.

Line 
1'''
2 Class supporting set up and communication with eXist DB
3 for the purposes of creating and updating atoms
4 
5 @author: C Byrom - Tessella 08
6'''
7import os, sys, logging, datetime, uuid, re
8from ndg.common.src.models.Atom import Atom
9from ndg.common.src.models import AtomState
10from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec
11from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient
12from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.lib.existinitialiser import EXistInitialiser
15from xml.etree import ElementTree as ET
16from threading import Thread
17
18class DuplicateError(Exception):
19    """
20    Exception handling for when a duplicated atom doc is discovered
21    """
22    def __init__(self, msg):
23        logging.error(msg)
24        Exception.__init__(self, msg)
25
26class backingUpThread(Thread):
27   
28   def __init__ (self, existClient, doc, collection, fileName):
29       logging.info("Setting up thread to run backup for file, '%s'" %fileName)
30       Thread.__init__(self)
31       self.ec = existClient
32       self.doc = doc
33       self.collection = collection
34       self.fileName = fileName
35       logging.info("- finished setting up thread")
36     
37   def run(self):
38       logging.info("Running thread to perform backup of file, '%s'" %self.fileName)
39       self.ec.createEXistFile(self.doc, self.collection, self.fileName)
40       logging.info("- finished backing up file")
41
42class publishingThread(Thread):
43   
44   def __init__ (self, edc, atom):
45       logging.info("Setting up thread to publish atom data for atom, '%s'" %atom.datasetID)
46       Thread.__init__(self)
47       self.edc = edc
48       self.atom = atom
49       logging.info("- finished setting up thread")
50     
51   def run(self):
52       logging.info("Running thread to publish atom data for atom, '%s'" %self.atom.datasetID)
53       self.edc._eXistDBClient__publishAtom(self.atom)
54       logging.info("- finished publishing atom data")
55       
56
57class eXistDBClient:
58   
59    def __init__(self, configFile = None, eXistDBHostname = None, \
60                 eXistPortNo = '8080', \
61                 loadCollectionData=False, setUpDB = False):
62        '''
63        Initialise a connection to the eXistDB
64        @keyword configFile: config file to use in setting up DB
65        @keyword existDBHostname: name of eXist DB to use - if not specified, the first
66        host in the config file is used
67        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
68        to '8080' if not set
69        @keyword loadCollectionData: preload info on all the eXist collections, if True (default False)
70        @keyword setUpDB: if True, create the basic collection structure and ingest the
71        atom schemas.  Default is False.
72        '''
73        logging.info("Initialising connection to eXist DB")
74        self.eXistDBHostname = eXistDBHostname
75        self.eXistPortNo = eXistPortNo
76        logging.debug("- connecting to DB, '%s:%s', with config file, '%s'" \
77                      %(eXistDBHostname or 'Default', self.eXistPortNo, \
78                        configFile or 'Default'))
79       
80        inputs = {}
81       
82        self.atomSchema = None
83       
84        # this keeps a record of the last file backed up - incase we need
85        # to do a rollback if there's an error encountered
86        self.backupName = None
87
88        # if we preload the collections data, it'll be stored here
89        self.collections = None
90       
91        # NB, there are two routes through here: if a config file is specified
92        # without a hostname, the host will be taken to be the first entry in
93        # the config file; if a hostname is specified, it will be used explicitly
94        if configFile:
95            inputs['pwfile'] = configFile
96            if not self.eXistDBHostname:
97                self.__loadDBDetails(configFile)
98           
99        # Now set up the connection
100        logging.debug(inputs)
101        self.xmldb = DR(self.eXistDBHostname, **inputs)
102
103        # set up feed client, too - NB, info should be added to the feed as it is added
104        # to the eXist collections, so the standard dbclient and the feed client
105        # are intrinsicly linked
106        self.feedClient = feedClient(self.xmldb.auth, 
107                                     eXistDBHostname = self.eXistDBHostname, 
108                                     eXistPortNo = self.eXistPortNo)
109       
110        if setUpDB:
111            # set up the eXist DB appropriately for use with ndgCommon
112            initialiser = EXistInitialiser(self)
113            initialiser.initialise()
114       
115        if loadCollectionData:
116            self.collections = self.getAllAtomCollections()
117
118        # dict to hold the data centre element info - used to augment atom info
119        # when doing atom->DIF translations - to avoid having to constantly
120        # look this up for the different provider IDs
121        self.dataCentres = {}
122           
123        logging.info("eXist DB connection initialised")
124
125
126    def __getSchema(self):
127        logging.debug("Getting atom schema data")
128        if not self.atomSchema:
129            self.atomSchema = ec.SCHEMATA_COLLECTION_PATH + '/' + \
130                self.xmldb.xq.ATOM_MOLES_SCHEMA  + '.xsd'
131
132        return self.atomSchema
133
134    AtomSchema = property(fget=__getSchema, doc="Atom schema path")
135
136
137    def createCollections(self, collections):
138        '''
139        Create the specified collections in eXist
140        @param collections: array of collections to create
141        @return True if successful
142        '''
143        logging.info("Setting up eXist collections")
144        for col in collections:
145            logging.debug("Creating collection, '%s'" %col)
146            self.xmldb.createCollection(col)
147        logging.info("All collections set up")
148       
149
150    def getAtom(self, id):
151        '''
152        Lookup the atom with id
153        @param id: id of the atom to retrieve
154        '''
155        logging.info("Looking up atom with id, '%s'" %(id))
156        doc = self.xmldb.get('', DR.ATOM, id, \
157                             targetCollection = ec.BASE_COLLECTION_PATH)
158        logging.info("Atom retrieved")
159        return doc
160       
161
162    def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False):
163        '''
164        Validate the specified atom in eXist with the atom schemata in eXist
165        @param atomPath: path to the atom in eXist
166        @keyword atom: if set to an atom, this will be created temporarily in eXist
167        - since it may not already exist there.  Once validation is completed, the
168        file will be removed from eXist.
169        @keyword isDebug: if True, return full error details, otherwise only return
170        a summary
171        @return array: containing any errors found - NB, if an empty array is returned,
172        this indicates successful validation
173        '''
174        logging.info("Validating atom, '%s' against schemata in eXist" %atomPath)
175        # path to temp file, if we create one
176        tempAtomPath = None
177        try:
178            if atom:
179                logging.info("Creating temporary file in eXist to do validation against")
180                fileName = atom.datasetID + str(datetime.datetime.today().microsecond)
181                self.createEXistFile(atom.toPrettyXML(), \
182                                     atom.getDefaultCollectionPath(), fileName)
183                tempAtomPath = atom.getDefaultCollectionPath() + fileName
184               
185            validationQuery = 'validation:validate-report("' + tempAtomPath + \
186                '", xs:anyURI("' + self.AtomSchema + '"))'
187            logging.debug("Running validation, '%s'" %validationQuery)
188
189            id, result = self.xmldb.executeQuery(validationQuery)
190            errorMessage = None
191            if result['hits'] == 0: 
192                errorMessage = "Validation did not complete successfully - please retry"
193            elif result['hits'] > 1:
194                errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry"
195               
196        except Exception, e:
197            errorMessage = "Error encountered whilst validating atom: '%s'" %e.message
198
199        if tempAtomPath:
200            logging.info("Deleting temporary file in eXist")
201            self.deleteEXistFile(tempAtomPath)
202
203        if errorMessage:
204            logging.error(errorMessage)
205            raise SystemError(errorMessage)
206       
207        doc = self.xmldb.retrieve(id, 0)
208        et = ET.fromstring(doc)
209        status = et.findtext('status')
210       
211        # retrieve the error detail if invalid
212        errors = []
213        if status == 'invalid':
214            logging.info("Atom is invalid - details as follows:")
215            for error in et.findall('message'):
216                lineNo = error.attrib.get('line')
217                colNo = error.attrib.get('column')
218                level = error.attrib.get('level')
219                repeat = error.attrib.get('repeat')
220                errorText = error.text
221                # remove the meaningless error type from message
222                if errorText.startswith('cvc-'):
223                    errorText = ':'.join(errorText.split(':')[1:])
224                errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText)
225                if repeat:
226                    errorMessage += " (%s times)" %repeat
227
228                if isDebug:
229                    errors.append(errorMessage)
230                else:
231                    errors.append(errorText)
232                logging.info(errorMessage)
233        else:
234            logging.info("Atom is valid")
235           
236        logging.info("Validation complete")
237        return errors
238
239       
240
241    def __loadDBDetails(self, configFile):
242        '''
243        Retrieve info from the eXist db config file
244        '''
245        logging.info("Loading DB config data")
246        # Check this file exists
247        if not os.path.isfile(configFile):
248            errorMessage = "Could not find the DB config file, %s; please make sure this " \
249                     "is available from the running directory" %configFile
250            logging.error(errorMessage)
251            raise ValueError(errorMessage)
252        dbinfo_file=open(configFile, "r")
253        dbinfo = dbinfo_file.read().split()
254        if len(dbinfo) < 3:
255            errorMessage = 'Incorrect data in DB config file'
256            logging.error(errorMessage)
257            raise ValueError(errorMessage)
258        self.eXistDBHostname = dbinfo[0]
259        self._username = dbinfo[1]
260        self._pw = dbinfo[2]
261        logging.info("DB config data loaded")
262
263
264    def __lookupEXistFile(self, docPath):
265        '''
266        Look up a file in eXist using XPath
267        @param docPath: path to doc to look up
268        @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None
269        '''
270        logging.info("Retrieving info for file, '%s'" %docPath)
271       
272        id, doc = self.xmldb.executeQuery('doc("' + docPath + '")')
273       
274        if doc['hits'] == 0:
275            logging.info("File does not exist in eXist DB")
276            return None
277       
278        # check status of result
279        doc = self.xmldb.retrieve(id, 0)
280        et = ET.fromstring(doc)
281        status = et.findtext('status')
282        if status == 'invalid':
283            raise SystemError("Problem occured whilst looking up file, '%s'" %docPath)
284       
285        logging.info("Found file - returning result ID")
286        return id
287
288
289    def isNewEXistCollection(self, collectionPath):
290        '''
291        Look up a collection in eXist using XPath
292        @param collectionPath: path to collection to look up
293        @return: False if collection exists, True otherwise
294        '''
295        logging.info("Checking if collection, '%s', exists in eXist" %collectionPath)
296       
297        id, doc = self.xmldb.executeQuery('collection("' + collectionPath + '")')
298        if doc['hits'] == 0:
299            logging.info("Collection does not exist in eXist DB")
300            return True
301        logging.info("Found collection")
302        return False
303         
304
305    def getEXistFile(self, docPath):
306        '''
307        Use XQuery to retrieve the specified document from eXist
308        @param docPath: the path of the doc to retrieve
309        @return: contents of document if exists, None otherwise
310        '''
311        id = self.__lookupEXistFile(docPath)
312       
313        if not id and id != 0:
314            logging.info("No file found - nothing to retrieve")
315            return None
316       
317        logging.info("Found file - now retrieving content")
318        doc = self.xmldb.retrieve(id, 0)
319        return doc
320
321
322    def isNewEXistFile(self, docPath):
323        '''
324        Test if a file already exists in eXist
325        @param docPath: path of file in eXist to look up
326        @return: True if a new file, False if otherwise
327        '''
328        logging.info("Checking if file, '%s', exists in eXist DB" %docPath)
329       
330        id = self.__lookupEXistFile(docPath)
331
332        if id:
333            return False
334       
335        return True
336
337
338    def __addTimeStamp(self, fileName):
339        '''
340        Add timestamp to input filename
341        NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp
342        is included before this; if not it is just added at the end
343        '''
344        bits = fileName.rsplit(".", 1)
345        fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S")
346       
347        if len(bits) > 1:
348            fileName += "." + bits[1]
349        return fileName
350
351
352    def __removeTimeStamp(self, fileName):
353        '''
354        Remove a timestamp from a file name
355        '''
356        match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName)
357        if match:
358            return match.group(1) + match.group(3)
359
360        return fileName
361
362    def backupEXistFile(self, collection, fileName, runAsynch = True):
363        '''
364        Backup a file that exists in the eXist DB
365        - NB, this really just creates a new file with the same contents in a
366        backup dir
367        - to improve efficiency, spawn this process as a new thread since we
368        don't need to worry about the outcome
369        @param collection: path of the collection to store the file in
370        @param fileName: name of file to add in eXist
371        @param runAsynch: if True, do the backup asynchronously in a separate thread
372        @return: path to new backup file
373        '''
374        if not collection.endswith('/'):
375            collection += '/'
376           
377        docPath = collection + fileName
378        logging.info("Backing up file, '%s', in eXist DB" %docPath)
379
380        logging.debug("Firstly, retrieve file contents from eXist")
381        doc = self.getEXistFile(docPath)
382        if not doc:
383            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
384            logging.error(errorMessage)
385            raise SystemError(errorMessage)
386       
387        # Now adjust the collection to map to the backup dir
388        collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH)
389        collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP)
390       
391        # add timestamp to filename
392        fileName = self.__addTimeStamp(fileName)
393        docPath = collection + fileName
394       
395        if runAsynch:
396            # run the back up in a separate thread
397            thread = backingUpThread(self, doc, collection, fileName)
398            thread.start()
399        else:
400            self.createEXistFile(doc, collection, fileName)
401           
402        return docPath
403
404
405    def createEXistFile(self, xml, collection, fileName):
406        '''
407        Add the input file to the eXist DB
408        @param xml: contents of xml file to create in eXist
409        @param collection: path of the collection to store the file in
410        @param fileName: name of file to add in eXist
411        @return: True, if file created successfully
412        '''
413        logging.info("Adding file, '%s' to eXist DB collection, '%s'" \
414                     %(fileName, collection))
415        logging.debug("data: %s" %xml)
416
417        # create the collection, if it doesn't already exist - NB, this won't overwrite anything
418        if self.collections is None or not collection in self.collections.values():
419            self.createCollections([collection])
420           
421        status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1)
422        if not status:
423            errorMessage = "Command to create file in eXist did not complete successfully - exiting"
424            logging.error(errorMessage)
425            raise SystemError(errorMessage)
426       
427        # update the stored collections info, if being used
428        if self.collections:
429            self.collections[fileName] = collection
430       
431        logging.info("File added to eXist")
432        return True
433
434
435    def deleteEXistFile(self, docPath):
436        '''
437        Delete the input file from eXist DB
438        @param docPath: path of document to delete
439        @return: True, if file deleted successfully
440        '''
441        logging.info("Deleting file, '%s', from eXist DB" %docPath)
442
443        status = self.xmldb.removeDoc(docPath)   
444        if not status:
445            errorMessage = "Command to delete file in eXist did not complete successfully - exiting"
446            logging.error(errorMessage)
447            raise SystemError(errorMessage)
448       
449        logging.info("File deleted from eXist")
450        return True
451
452
453    def createOrUpdateEXistFile(self, xml, collection, fileName):
454        '''
455        Check if a file already exists in eXist; if it does, run an
456        update (which will backup the existing file), otherwise create
457        the file in eXist
458        @param xml: contents of xml file to create/update in eXist
459        @param collection: path of the collection to store the file in
460        @param fileName: name of file to add in eXist
461        '''
462        logging.info("Creating or updating file in eXist...")
463        if not self.isNewEXistFile(collection + fileName):
464            self.backupEXistFile(collection, fileName)
465           
466        self.createEXistFile(xml, collection, fileName)
467
468
469    def getAllAtomIDs(self):
470        '''
471        Retrieve all the atom IDs in the atoms directory - NB, this can
472        be a quick way of producing a cache of data to check - e.g. to avoid
473        multiple calls to getAtomFileCollectionPath
474        @return: ids - array of all atom IDs
475        '''
476        logging.info("Retrieving all atom ids")
477        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
478        id, doc = self.xmldb.executeQuery(xq)
479        if doc['hits'] == 0: 
480            return []
481       
482        indices = range(doc['hits'])
483       
484        doc = self.xmldb.retrieve(id, 0)
485        et = ET.fromstring(doc)
486        ids = []
487        for member in et:
488            fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID')
489            ids.append(fn)
490        logging.debug("Found ids, '%s'" %ids)
491        return ids
492
493
494    def getAllAtomCollections(self):
495        '''
496        Get all atom collection paths and store in a dictionary - for easy
497        reference when doing lots of things at once
498        @return: dict with key/val of filename/collectionPath
499        '''
500        logging.info("Retrieving all atom collection paths")
501        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
502        id, doc = self.xmldb.executeQuery(xq)
503        if doc['hits'] == 0: 
504            return []
505       
506        indices = range(doc['hits'])
507       
508        doc = self.xmldb.retrieve(id, 0)
509        et = ET.fromstring(doc)
510        colData = {}
511        for member in et:
512            collection = member.findtext('{http://www.w3.org/2005/Atom}fileName')
513            fileName = collection.split('/')[-1]
514            fileName = fileName.split('.')[0:-1]
515            fileName = '.'.join(fileName)
516            dir = '/'.join(collection.split('/')[0:-1])
517            colData[fileName] = dir
518
519        logging.debug("Finished looking up atom paths")
520        return colData
521
522
523    def getAtomPublicationState(self, atomID):
524        '''
525        Retrieve the publication state of the specified atom - by
526        checking the collection it is in
527        @param atom: atom id to look up
528        @return: AtomState for the atom.  NB, if the ID is not found, assume
529        we're dealing with a new atom and set the state as the working state
530        '''
531        logging.debug("Finding atom publication state")
532        path = self.getAtomFileCollectionPath(atomID)
533        for state in AtomState.allStates.values():
534            if path.find('/%s' %state.collectionPath) > -1:
535                logging.debug("- state found: '%s'" %state.title)
536                return state
537       
538        logging.debug("- state not found - returning WORKING state")
539        return AtomState.WORKING_STATE
540
541
542    def getAtomFileCollectionPath(self, atomID):
543        '''
544        Given an atom id, determine and return the collection path in eXist
545        of the associated atom file
546        @param atom: atom id to look up
547        @return: collection path, if it exists, None, otherwise
548        '''
549        logging.info("Looking up collection path for atom ID, '%s'" %atomID)
550        xq = self.xmldb.xq['atomFullPath']
551        xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH)
552        xq = xq.replace('LocalID', atomID)
553
554        id, doc = self.xmldb.executeQuery(xq)
555        if doc['hits'] == 0:
556            logging.info("No document found with the specified ID")
557            return None
558
559        doc = self.xmldb.retrieve(id,0,{})
560
561        docET = ET.fromstring(doc)
562        collPath = docET.text + '/'
563        logging.debug("Found collection path, '%s'" %collPath)
564        return collPath
565
566
567    def deleteAtomInExist(self, atom):
568        '''
569        Delete the given atom from the eXist DB - using the atom
570        details to work out the required path to delete
571        '''
572        logging.info("Deleting atom from eXist")
573        atomPath = atom.getDefaultCollectionPath() + atom.atomName
574        self.deleteEXistFile(atomPath)
575        logging.info("Atom deleted")
576
577
578    def changeAtomPublicationStateInExist(self, atom, newState):
579        '''
580        Adjust the publication state of an atom in eXist
581        @param atom: the Atom data model of the atom whose publication state
582        needs to change
583        @param newState: an AtomState object representing the new publication
584        state of the atom
585        @return atom: atom data model with updated state
586        '''
587        logging.info("Changing the publication state of atom - from '%s' to '%s'" \
588                     %(atom.state.title, newState.title))
589        oldState = atom.state
590        # firstly, create atom in new publication state collection - so data isn't
591        # lost if this fails
592        atom.state = newState
593        self.createEXistFile(atom.toPrettyXML(), atom.getDefaultCollectionPath(), 
594                             atom.atomName)
595       
596        # now delete atom in the old publication state
597        atom.state = oldState
598        self.deleteAtomInExist(atom)
599        logging.info("- atom created in new publication state and removed from old one")
600        atom.state = newState
601       
602        # update feeds + create DIFs, if needed
603        if atom.isPublished():
604            self.runAsynchAtomPublish(atom)
605       
606        return atom
607       
608
609           
610    def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True):
611        '''
612        Create an atom in the eXist DB - using the atom contents to work out
613        the location + data set ID
614        @param atom: atom object to create in the DB
615        @keyword replaceAtom: if False and the atom is already available in eXist
616        @param runAsynch: if True, if a backup of an existing file, do this
617        asynchronously in a separate thread + do the feed publishing and DIF
618        creating in a separate thread, too
619        then raise a ValueError.
620        '''
621        logging.info("Creating atom in eXist")
622        if not atom:
623            raise ValueError("Input is not an object - cannot create in eXist")
624        if not isinstance(atom, Atom):
625            raise ValueError("Input object is not an Atom object - cannot create in eXist")
626       
627        # if the atom has no dataset ID, generate and add one
628        # NB, this should only be the case when the atom is being created
629        # via the web interface
630        isNew = False
631        if not atom.datasetID:
632            isNew = True
633            atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1()))
634
635        eXistCollection = None
636        if self.collections is not None: # cope with empty dict
637            eXistCollection = self.collections.get(atom.datasetID)
638        else:
639            eXistCollection = self.getAtomFileCollectionPath(atom.datasetID)
640       
641        # if collection not found, assume we're dealing with a new atom; get its
642        # default collection
643        if not eXistCollection:
644            eXistCollection = atom.getDefaultCollectionPath()
645
646            # check if we need a new provider feed set up
647            providerCollection = ec.PROVIDER_FEED_PATH + atom.ME.providerID + '/'
648            if self.isNewEXistCollection(providerCollection):
649                logging.info("Creating feed for new provider ID")
650                self.createCollections([providerCollection])
651                self.feedClient.createAtomFeed(providerCollection,
652                                               self.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %atom.ME.providerID)
653           
654        elif isNew:
655            # in this situation we're trying to create an atom with the same
656            # name via the web interface - this can't be allowed - so retry to
657            # generate a new ID
658            atom.datasetID = None
659            self.createAtomInExist(atom)
660            return
661        # create backup of atom if it already exists
662        else:
663            if not replaceAtom:
664                raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \
665                                     %atom.datasetID)
666            # store name of backup - to allow restore, if subsequent ops fail
667            self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \
668                                                   runAsynch = runAsynch)
669           
670            # also change updated date to current time
671            atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ")
672           
673        self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName)
674       
675        logging.info("Atom created in eXist")
676       
677        # lastly, if we're dealing with a published atom, update DIF records +
678        # feeds
679        if atom.isPublished():
680            if runAsynch:
681                self.runAsynchAtomPublish(atom)
682            else:
683                self.__publishAtom(atom)
684           
685        return atom
686   
687    def runAsynchAtomPublish(self, atom):
688        thread = publishingThread(self, atom)
689        thread.start()
690           
691   
692    def __publishAtom(self, atom):
693        '''
694        Add atom info to the various feeds - and if it is a data entity, use
695        it to create a DIF document and add this to feeds, also
696        '''
697        if atom.isDE():
698            self.createDIFDocumentFromAtom(atom)
699       
700        self.feedClient.addAtomToFeeds(atom)
701       
702   
703   
704    def restoreBackup(self, docPath):
705        '''
706        Restore the backed up file - effectively recreating in the non-backup collection
707        @param docPath: path to file to backup
708        '''
709        logging.info("Restoring file, '%s' in eXist" %docPath)
710        doc = self.getEXistFile(docPath)
711       
712        if not doc:
713            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
714            logging.error(errorMessage)
715            raise SystemError(errorMessage)
716       
717        bits = docPath.split('/')
718        fileName = bits[-1]
719        collection = '/'.join(bits[0:-1])
720        # remove timestamp from filename
721        fileName = self.__removeTimeStamp(fileName)
722       
723        # Now adjust the collection to map to the backup dir
724        collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH)
725        collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH)
726       
727        self.createEXistFile(doc, collection, fileName)
728        logging.info("File restored")
729
730
731    def transformAtomIntoDIF(self, atom):
732        '''
733        Transform an atom into a DIF document - using an XQuery transform ran
734        in eXist
735        @param atom: the Atom data model to convert
736        @return: the produced DIF document
737        '''
738        logging.info("Creating DIF record from atom - using XQuery transform")
739       
740        # get the query and set this up to use properly
741        targetCollection = atom.getPublicationStatePath()
742        providerID = atom.ME.providerID
743        # remove the trailing forward slash - otherwise the xquery won't work
744        if targetCollection.endswith('/'):
745            targetCollection = targetCollection[0:-1]
746        xquery = self.xmldb.xq.actual('atom2DIF', targetCollection, \
747                                 providerID, atom.datasetID)
748
749        discovery_id,s = self.xmldb.executeQuery(xquery)
750        if s.has_key('hits') and not s['hits']:
751            raise Exception("XQuery produced no results - suggesting a problem with the query")
752        doc = self.xmldb.retrieve(discovery_id,0)
753       
754        # add various missing info
755        # get the organisation data for the repository
756        # - NB, store for re-use to avoid multiple lookups of the same info
757        if self.dataCentres.has_key(providerID):
758            dataCentre = self.dataCentres[providerID]
759        else:
760            dataCentre = getDataCentreDIFElement(providerID, self.xmldb)
761            self.dataCentres[providerID] = dataCentre
762
763        # add various other data to atoms - to make up with the incomplete data spec there
764        doc = addOrgData(doc, targetCollection, dataCentre)
765        doc = expandParametersData(doc)
766        doc = addStandardKeywords(doc)
767
768        logging.info("Transform completed successfully - returning DIF doc")
769        return doc
770
771
772    def createDIFDocumentFromAtom(self, atom):
773        '''
774        Transform an atom into a DIF document and store this in eXist
775        @param atom: the Atom data model to convert
776        '''
777        logging.info("Creating and storing DIF document")
778        doc = self.transformAtomIntoDIF(atom)
779        fileName = atom.atomName.replace('.atom', '.xml')
780       
781        # now add to eXist
782        providerID = atom.ME.providerID
783        collectionPath = ec.DIF_COLLECTION_PATH + providerID
784       
785        # NB, check if a top level provider collection exists; if not, add
786        # this with a feed
787        # NB, the collection will be created along with the DIF file - so set the
788        # feed up after the file has been created
789        setupFeed = False
790        if self.isNewEXistCollection(collectionPath):
791            setupFeed = True
792
793        self.createOrUpdateEXistFile(doc, collectionPath,
794                                     fileName)       
795        if setupFeed:
796            logging.info("Creating feed for new provider ID")
797            self.createCollections([collectionPath])
798            self.feedClient.createAtomFeed(collectionPath,
799                                           self.feedClient.PROVIDERLEVEL_DIF_FEED_TITLE %providerID)
800        logging.info("DIF document created and stored")
801
Note: See TracBrowser for help on using the repository browser.