source: ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py @ 4843

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py@4843
Revision 4843, 31.1 KB checked in by cbyrom, 11 years ago (diff)

Include better management of stored reference collections.

Line 
1'''
2 Class supporting set up and communication with eXist DB
3 for the purposes of creating and updating atoms
4 
5 @author: C Byrom - Tessella 08
6'''
7import os, sys, logging, datetime, uuid, re
8from ndg.common.src.models.Atom import Atom
9from ndg.common.src.models import AtomState
10from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec
11from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient
12from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.lib.existinitialiser import EXistInitialiser
15from xml.etree import ElementTree as ET
16from threading import Thread
17
18class DuplicateError(Exception):
19    """
20    Exception handling for when a duplicated atom doc is discovered
21    """
22    def __init__(self, msg):
23        logging.error(msg)
24        Exception.__init__(self, msg)
25
26class backingUpThread(Thread):
27   
28   def __init__ (self, existClient, doc, collection, fileName):
29       logging.info("Setting up thread to run backup for file, '%s'" %fileName)
30       Thread.__init__(self)
31       self.ec = existClient
32       self.doc = doc
33       self.collection = collection
34       self.fileName = fileName
35       logging.info("- finished setting up thread")
36     
37   def run(self):
38       logging.info("Running thread to perform backup of file, '%s'" %self.fileName)
39       self.ec.createEXistFile(self.doc, self.collection, self.fileName)
40       logging.info("- finished backing up file")
41
42class publishingThread(Thread):
43   
44   def __init__ (self, edc, atom):
45       logging.info("Setting up thread to publish atom data for atom, '%s'" %atom.datasetID)
46       Thread.__init__(self)
47       self.edc = edc
48       self.atom = atom
49       logging.info("- finished setting up thread")
50     
51   def run(self):
52       logging.info("Running thread to publish atom data for atom, '%s'" %self.atom.datasetID)
53       self.edc._eXistDBClient__publishAtom(self.atom)
54       logging.info("- finished publishing atom data")
55       
56
57class eXistDBClient:
58   
59    def __init__(self, configFile = None, eXistDBHostname = None, \
60                 eXistPortNo = '8080', \
61                 loadCollectionData=False, setUpDB = False):
62        '''
63        Initialise a connection to the eXistDB
64        @keyword configFile: config file to use in setting up DB
65        @keyword existDBHostname: name of eXist DB to use - if not specified, the first
66        host in the config file is used
67        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
68        to '8080' if not set
69        @keyword loadCollectionData: preload info on all the eXist collections, if True (default False)
70        @keyword setUpDB: if True, create the basic collection structure and ingest the
71        atom schemas.  Default is False.
72        '''
73        logging.info("Initialising connection to eXist DB")
74        self.eXistDBHostname = eXistDBHostname
75        self.eXistPortNo = eXistPortNo
76        logging.debug("- connecting to DB, '%s:%s', with config file, '%s'" \
77                      %(eXistDBHostname or 'Default', self.eXistPortNo, \
78                        configFile or 'Default'))
79       
80        inputs = {}
81       
82        self.atomSchema = None
83       
84        # this keeps a record of the last file backed up - incase we need
85        # to do a rollback if there's an error encountered
86        self.backupName = None
87
88        # if we preload the collections data, it'll be stored here
89        self.collections = None
90       
91        # NB, there are two routes through here: if a config file is specified
92        # without a hostname, the host will be taken to be the first entry in
93        # the config file; if a hostname is specified, it will be used explicitly
94        if configFile:
95            inputs['pwfile'] = configFile
96            if not self.eXistDBHostname:
97                self.__loadDBDetails(configFile)
98           
99        # Now set up the connection
100        logging.debug(inputs)
101        self.xmldb = DR(self.eXistDBHostname, **inputs)
102
103        # set up feed client, too - NB, info should be added to the feed as it is added
104        # to the eXist collections, so the standard dbclient and the feed client
105        # are intrinsicly linked
106        self.feedClient = feedClient(self.xmldb.auth, 
107                                     eXistDBHostname = self.eXistDBHostname, 
108                                     eXistPortNo = self.eXistPortNo)
109       
110        if setUpDB:
111            # set up the eXist DB appropriately for use with ndgCommon
112            initialiser = EXistInitialiser(self)
113            initialiser.initialise()
114       
115        if loadCollectionData:
116            self.collections = self.getAllAtomCollections()
117
118        # dict to hold the data centre element info - used to augment atom info
119        # when doing atom->DIF translations - to avoid having to constantly
120        # look this up for the different provider IDs
121        self.dataCentres = {}
122           
123        logging.info("eXist DB connection initialised")
124
125
126    def __getSchema(self):
127        logging.debug("Getting atom schema data")
128        if not self.atomSchema:
129            self.atomSchema = ec.SCHEMATA_COLLECTION_PATH + '/' + \
130                self.xmldb.xq.ATOM_MOLES_SCHEMA  + '.xsd'
131
132        return self.atomSchema
133
134    AtomSchema = property(fget=__getSchema, doc="Atom schema path")
135
136
137    def createCollections(self, collections):
138        '''
139        Create the specified collections in eXist
140        @param collections: array of collections to create
141        @return True if successful
142        '''
143        logging.info("Setting up eXist collections")
144        for col in collections:
145            logging.debug("Creating collection, '%s'" %col)
146            self.xmldb.createCollection(col)
147        logging.info("All collections set up")
148       
149
150    def getAtom(self, id):
151        '''
152        Lookup the atom with id
153        @param id: id of the atom to retrieve
154        '''
155        logging.info("Looking up atom with id, '%s'" %(id))
156        doc = self.xmldb.get('', DR.ATOM, id, \
157                             targetCollection = ec.BASE_COLLECTION_PATH)
158        logging.info("Atom retrieved")
159        return doc
160       
161
162    def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False):
163        '''
164        Validate the specified atom in eXist with the atom schemata in eXist
165        @param atomPath: path to the atom in eXist
166        @keyword atom: if set to an atom, this will be created temporarily in eXist
167        - since it may not already exist there.  Once validation is completed, the
168        file will be removed from eXist.
169        @keyword isDebug: if True, return full error details, otherwise only return
170        a summary
171        @return array: containing any errors found - NB, if an empty array is returned,
172        this indicates successful validation
173        '''
174        logging.info("Validating atom, '%s' against schemata in eXist" %atomPath)
175        try:
176            if atom:
177                logging.info("Creating temporary file in eXist to do validation against")
178                fileName = atom.datasetID + str(datetime.datetime.today().microsecond)
179                self.createEXistFile(atom.toPrettyXML(), \
180                                     atom.getDefaultCollectionPath(), fileName)
181                atomPath = atom.getDefaultCollectionPath() + fileName
182               
183            validationQuery = 'validation:validate-report("' + atomPath + \
184                '", xs:anyURI("' + self.AtomSchema + '"))'
185            logging.debug("Running validation, '%s'" %validationQuery)
186
187            id, result = self.xmldb.executeQuery(validationQuery)
188            errorMessage = None
189            if result['hits'] == 0: 
190                errorMessage = "Validation did not complete successfully - please retry"
191            elif result['hits'] > 1:
192                errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry"
193               
194        except Exception, e:
195            errorMessage = "Error encountered whilst validating atom: '%s'" %e.message
196
197        if atom:
198            logging.info("Deleting temporary file in eXist")
199            self.deleteEXistFile(atomPath)
200
201        if errorMessage:
202            logging.error(errorMessage)
203            raise SystemError(errorMessage)
204       
205        doc = self.xmldb.retrieve(id, 0)
206        et = ET.fromstring(doc)
207        status = et.findtext('status')
208       
209        # retrieve the error detail if invalid
210        errors = []
211        if status == 'invalid':
212            logging.info("Atom is invalid - details as follows:")
213            for error in et.findall('message'):
214                lineNo = error.attrib.get('line')
215                colNo = error.attrib.get('column')
216                level = error.attrib.get('level')
217                repeat = error.attrib.get('repeat')
218                errorText = error.text
219                # remove the meaningless error type from message
220                if errorText.startswith('cvc-'):
221                    errorText = ':'.join(errorText.split(':')[1:])
222                errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText)
223                if repeat:
224                    errorMessage += " (%s times)" %repeat
225
226                if isDebug:
227                    errors.append(errorMessage)
228                else:
229                    errors.append(errorText)
230                logging.info(errorMessage)
231        else:
232            logging.info("Atom is valid")
233           
234        logging.info("Validation complete")
235        return errors
236
237       
238
239    def __loadDBDetails(self, configFile):
240        '''
241        Retrieve info from the eXist db config file
242        '''
243        logging.info("Loading DB config data")
244        # Check this file exists
245        if not os.path.isfile(configFile):
246            errorMessage = "Could not find the DB config file, %s; please make sure this " \
247                     "is available from the running directory" %configFile
248            logging.error(errorMessage)
249            raise ValueError(errorMessage)
250        dbinfo_file=open(configFile, "r")
251        dbinfo = dbinfo_file.read().split()
252        if len(dbinfo) < 3:
253            errorMessage = 'Incorrect data in DB config file'
254            logging.error(errorMessage)
255            raise ValueError(errorMessage)
256        self.eXistDBHostname = dbinfo[0]
257        self._username = dbinfo[1]
258        self._pw = dbinfo[2]
259        logging.info("DB config data loaded")
260
261
262    def __lookupEXistFile(self, docPath):
263        '''
264        Look up a file in eXist using XPath
265        @param docPath: path to doc to look up
266        @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None
267        '''
268        logging.info("Retrieving info for file, '%s'" %docPath)
269       
270        id, doc = self.xmldb.executeQuery('doc("' + docPath + '")')
271       
272        if doc['hits'] == 0:
273            logging.info("File does not exist in eXist DB")
274            return None
275       
276        # check status of result
277        doc = self.xmldb.retrieve(id, 0)
278        et = ET.fromstring(doc)
279        status = et.findtext('status')
280        if status == 'invalid':
281            raise SystemError("Problem occured whilst looking up file, '%s'" %docPath)
282       
283        logging.info("Found file - returning result ID")
284        return id
285
286
287    def isNewEXistCollection(self, collectionPath):
288        '''
289        Look up a collection in eXist using XPath
290        @param collectionPath: path to collection to look up
291        @return: False if collection exists, True otherwise
292        '''
293        logging.info("Checking if collection, '%s', exists in eXist" %collectionPath)
294       
295        id, doc = self.xmldb.executeQuery('collection("' + collectionPath + '")')
296        if doc['hits'] == 0:
297            logging.info("Collection does not exist in eXist DB")
298            return True
299        logging.info("Found collection")
300        return False
301         
302
303    def getEXistFile(self, docPath):
304        '''
305        Use XQuery to retrieve the specified document from eXist
306        @param docPath: the path of the doc to retrieve
307        @return: contents of document if exists, None otherwise
308        '''
309        id = self.__lookupEXistFile(docPath)
310       
311        if not id and id != 0:
312            logging.info("No file found - nothing to retrieve")
313            return None
314       
315        logging.info("Found file - now retrieving content")
316        doc = self.xmldb.retrieve(id, 0)
317        return doc
318
319
320    def isNewEXistFile(self, docPath):
321        '''
322        Test if a file already exists in eXist
323        @param docPath: path of file in eXist to look up
324        @return: True if a new file, False if otherwise
325        '''
326        logging.info("Checking if file, '%s', exists in eXist DB" %docPath)
327       
328        id = self.__lookupEXistFile(docPath)
329
330        if id:
331            return False
332       
333        return True
334
335
336    def __addTimeStamp(self, fileName):
337        '''
338        Add timestamp to input filename
339        NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp
340        is included before this; if not it is just added at the end
341        '''
342        bits = fileName.rsplit(".", 1)
343        fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S")
344       
345        if len(bits) > 1:
346            fileName += "." + bits[1]
347        return fileName
348
349
350    def __removeTimeStamp(self, fileName):
351        '''
352        Remove a timestamp from a file name
353        '''
354        match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName)
355        if match:
356            return match.group(1) + match.group(3)
357
358        return fileName
359
360    def backupEXistFile(self, collection, fileName, runAsynch = True):
361        '''
362        Backup a file that exists in the eXist DB
363        - NB, this really just creates a new file with the same contents in a
364        backup dir
365        - to improve efficiency, spawn this process as a new thread since we
366        don't need to worry about the outcome
367        @param collection: path of the collection to store the file in
368        @param fileName: name of file to add in eXist
369        @param runAsynch: if True, do the backup asynchronously in a separate thread
370        @return: path to new backup file
371        '''
372        if not collection.endswith('/'):
373            collection += '/'
374           
375        docPath = collection + fileName
376        logging.info("Backing up file, '%s', in eXist DB" %docPath)
377
378        logging.debug("Firstly, retrieve file contents from eXist")
379        doc = self.getEXistFile(docPath)
380        if not doc:
381            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
382            logging.error(errorMessage)
383            raise SystemError(errorMessage)
384       
385        # Now adjust the collection to map to the backup dir
386        collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH)
387        collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP)
388       
389        # add timestamp to filename
390        fileName = self.__addTimeStamp(fileName)
391        docPath = collection + fileName
392       
393        if runAsynch:
394            # run the back up in a separate thread
395            thread = backingUpThread(self, doc, collection, fileName)
396            thread.start()
397        else:
398            self.createEXistFile(doc, collection, fileName)
399           
400        return docPath
401
402
403    def createEXistFile(self, xml, collection, fileName):
404        '''
405        Add the input file to the eXist DB
406        @param xml: contents of xml file to create in eXist
407        @param collection: path of the collection to store the file in
408        @param fileName: name of file to add in eXist
409        @return: True, if file created successfully
410        '''
411        logging.info("Adding file, '%s' to eXist DB collection, '%s'" \
412                     %(fileName, collection))
413        logging.debug("data: %s" %xml)
414
415        # create the collection, if it doesn't already exist - NB, this won't overwrite anything
416        if self.collections is None or not collection in self.collections.values():
417            self.createCollections([collection])
418           
419        status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1)
420        if not status:
421            errorMessage = "Command to create file in eXist did not complete successfully - exiting"
422            logging.error(errorMessage)
423            raise SystemError(errorMessage)
424       
425        # update the stored collections info, if being used
426        if self.collections:
427            self.collections[fileName] = collection
428       
429        logging.info("File added to eXist")
430        return True
431
432
433    def deleteEXistFile(self, docPath):
434        '''
435        Delete the input file from eXist DB
436        @param docPath: path of document to delete
437        @return: True, if file deleted successfully
438        '''
439        logging.info("Deleting file, '%s', from eXist DB" %docPath)
440
441        status = self.xmldb.removeDoc(docPath)   
442        if not status:
443            errorMessage = "Command to delete file in eXist did not complete successfully - exiting"
444            logging.error(errorMessage)
445            raise SystemError(errorMessage)
446       
447        logging.info("File deleted from eXist")
448        return True
449
450
451    def createOrUpdateEXistFile(self, xml, collection, fileName):
452        '''
453        Check if a file already exists in eXist; if it does, run an
454        update (which will backup the existing file), otherwise create
455        the file in eXist
456        @param xml: contents of xml file to create/update in eXist
457        @param collection: path of the collection to store the file in
458        @param fileName: name of file to add in eXist
459        '''
460        logging.info("Creating or updating file in eXist...")
461        if not self.isNewEXistFile(collection + fileName):
462            self.backupEXistFile(collection, fileName)
463           
464        self.createEXistFile(xml, collection, fileName)
465
466
467    def getAllAtomIDs(self):
468        '''
469        Retrieve all the atom IDs in the atoms directory - NB, this can
470        be a quick way of producing a cache of data to check - e.g. to avoid
471        multiple calls to getAtomFileCollectionPath
472        @return: ids - array of all atom IDs
473        '''
474        logging.info("Retrieving all atom ids")
475        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
476        id, doc = self.xmldb.executeQuery(xq)
477        if doc['hits'] == 0: 
478            return []
479       
480        indices = range(doc['hits'])
481       
482        doc = self.xmldb.retrieve(id, 0)
483        et = ET.fromstring(doc)
484        ids = []
485        for member in et:
486            fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID')
487            ids.append(fn)
488        logging.debug("Found ids, '%s'" %ids)
489        return ids
490
491
492    def getAllAtomCollections(self):
493        '''
494        Get all atom collection paths and store in a dictionary - for easy
495        reference when doing lots of things at once
496        @return: dict with key/val of filename/collectionPath
497        '''
498        logging.info("Retrieving all atom collection paths")
499        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
500        id, doc = self.xmldb.executeQuery(xq)
501        if doc['hits'] == 0: 
502            return []
503       
504        indices = range(doc['hits'])
505       
506        doc = self.xmldb.retrieve(id, 0)
507        et = ET.fromstring(doc)
508        colData = {}
509        for member in et:
510            collection = member.findtext('{http://www.w3.org/2005/Atom}fileName')
511            fileName = collection.split('/')[-1]
512            fileName = fileName.split('.')[0:-1]
513            fileName = '.'.join(fileName)
514            dir = '/'.join(collection.split('/')[0:-1])
515            colData[fileName] = dir
516
517        logging.debug("Finished looking up atom paths")
518        return colData
519
520
521    def getAtomPublicationState(self, atomID):
522        '''
523        Retrieve the publication state of the specified atom - by
524        checking the collection it is in
525        @param atom: atom id to look up
526        @return: AtomState for the atom.  NB, if the ID is not found, assume
527        we're dealing with a new atom and set the state as the working state
528        '''
529        logging.debug("Finding atom publication state")
530        path = self.getAtomFileCollectionPath(atomID)
531        for state in AtomState.allStates.values():
532            if path.find('/%s' %state.collectionPath) > -1:
533                logging.debug("- state found: '%s'" %state.title)
534                return state
535       
536        logging.debug("- state not found - returning WORKING state")
537        return AtomState.WORKING_STATE
538
539
540    def getAtomFileCollectionPath(self, atomID):
541        '''
542        Given an atom id, determine and return the collection path in eXist
543        of the associated atom file
544        @param atom: atom id to look up
545        @return: collection path, if it exists, None, otherwise
546        '''
547        logging.info("Looking up collection path for atom ID, '%s'" %atomID)
548        xq = self.xmldb.xq['atomFullPath']
549        xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH)
550        xq = xq.replace('LocalID', atomID)
551
552        id, doc = self.xmldb.executeQuery(xq)
553        if doc['hits'] == 0:
554            logging.info("No document found with the specified ID")
555            return None
556
557        doc = self.xmldb.retrieve(id,0,{})
558
559        docET = ET.fromstring(doc)
560        collPath = docET.text + '/'
561        logging.debug("Found collection path, '%s'" %collPath)
562        return collPath
563
564
565    def deleteAtomInExist(self, atom):
566        '''
567        Delete the given atom from the eXist DB - using the atom
568        details to work out the required path to delete
569        '''
570        logging.info("Deleting atom from eXist")
571        atomPath = atom.getDefaultCollectionPath() + atom.atomName
572        self.deleteEXistFile(atomPath)
573        logging.info("Atom deleted")
574
575
576    def changeAtomPublicationStateInExist(self, atom, newState):
577        '''
578        Adjust the publication state of an atom in eXist
579        @param atom: the Atom data model of the atom whose publication state
580        needs to change
581        @param newState: an AtomState object representing the new publication
582        state of the atom
583        @return atom: atom data model with updated state
584        '''
585        logging.info("Changing the publication state of atom - from '%s' to '%s'" \
586                     %(atom.state.title, newState.title))
587        oldState = atom.state
588        # firstly, create atom in new publication state collection - so data isn't
589        # lost if this fails
590        atom.state = newState
591        self.createEXistFile(atom.toPrettyXML(), atom.getDefaultCollectionPath(), 
592                             atom.atomName)
593       
594        # now delete atom in the old publication state
595        atom.state = oldState
596        self.deleteAtomInExist(atom)
597        logging.info("- atom created in new publication state and removed from old one")
598        atom.state = newState
599       
600        # update feeds + create DIFs, if needed
601        if atom.isPublished():
602            self.runAsynchAtomPublish(atom)
603       
604        return atom
605       
606
607           
608    def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True):
609        '''
610        Create an atom in the eXist DB - using the atom contents to work out
611        the location + data set ID
612        @param atom: atom object to create in the DB
613        @keyword replaceAtom: if False and the atom is already available in eXist
614        @param runAsynch: if True, if a backup of an existing file, do this
615        asynchronously in a separate thread + do the feed publishing and DIF
616        creating in a separate thread, too
617        then raise a ValueError.
618        '''
619        logging.info("Creating atom in eXist")
620        if not atom:
621            raise ValueError("Input is not an object - cannot create in eXist")
622        if not isinstance(atom, Atom):
623            raise ValueError("Input object is not an Atom object - cannot create in eXist")
624       
625        # if the atom has no dataset ID, generate and add one
626        # NB, this should only be the case when the atom is being created
627        # via the web interface
628        isNew = False
629        if not atom.datasetID:
630            isNew = True
631            atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1()))
632
633        eXistCollection = None
634        if self.collections is not None: # cope with empty dict
635            eXistCollection = self.collections.get(atom.datasetID)
636        else:
637            eXistCollection = self.getAtomFileCollectionPath(atom.datasetID)
638       
639        # if collection not found, assume we're dealing with a new atom; get its
640        # default collection
641        if not eXistCollection:
642            eXistCollection = atom.getDefaultCollectionPath()
643
644            # check if we need a new provider feed set up
645            providerCollection = ec.PROVIDER_FEED_PATH + atom.ME.providerID + '/'
646            if self.isNewEXistCollection(providerCollection):
647                logging.info("Creating feed for new provider ID")
648                self.createCollections([providerCollection])
649                self.feedClient.createAtomFeed(providerCollection,
650                                               self.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %atom.ME.providerID)
651           
652        elif isNew:
653            # in this situation we're trying to create an atom with the same
654            # name via the web interface - this can't be allowed - so retry to
655            # generate a new ID
656            atom.datasetID = None
657            self.createAtomInExist(atom)
658            return
659        # create backup of atom if it already exists
660        else:
661            if not replaceAtom:
662                raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \
663                                     %atom.datasetID)
664            # store name of backup - to allow restore, if subsequent ops fail
665            self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \
666                                                   runAsynch = runAsynch)
667           
668            # also change updated date to current time
669            atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ")
670           
671        self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName)
672       
673        logging.info("Atom created in eXist")
674       
675        # lastly, if we're dealing with a published atom, update DIF records +
676        # feeds
677        if atom.isPublished():
678            if runAsynch:
679                self.runAsynchAtomPublish(atom)
680            else:
681                self.__publishAtom(atom)
682           
683        return atom
684   
685    def runAsynchAtomPublish(self, atom):
686        thread = publishingThread(self, atom)
687        thread.start()
688           
689   
690    def __publishAtom(self, atom):
691        '''
692        Add atom info to the various feeds - and if it is a data entity, use
693        it to create a DIF document and add this to feeds, also
694        '''
695        if atom.isDE():
696            self.createDIFDocumentFromAtom(atom)
697       
698        self.feedClient.addAtomToFeeds(atom)
699       
700   
701   
702    def restoreBackup(self, docPath):
703        '''
704        Restore the backed up file - effectively recreating in the non-backup collection
705        @param docPath: path to file to backup
706        '''
707        logging.info("Restoring file, '%s' in eXist" %docPath)
708        doc = self.getEXistFile(docPath)
709       
710        if not doc:
711            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
712            logging.error(errorMessage)
713            raise SystemError(errorMessage)
714       
715        bits = docPath.split('/')
716        fileName = bits[-1]
717        collection = '/'.join(bits[0:-1])
718        # remove timestamp from filename
719        fileName = self.__removeTimeStamp(fileName)
720       
721        # Now adjust the collection to map to the backup dir
722        collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH)
723        collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH)
724       
725        self.createEXistFile(doc, collection, fileName)
726        logging.info("File restored")
727
728
729    def transformAtomIntoDIF(self, atom):
730        '''
731        Transform an atom into a DIF document - using an XQuery transform ran
732        in eXist
733        @param atom: the Atom data model to convert
734        @return: the produced DIF document
735        '''
736        logging.info("Creating DIF record from atom - using XQuery transform")
737       
738        # get the query and set this up to use properly
739        targetCollection = atom.getPublicationStatePath()
740        providerID = atom.ME.providerID
741        # remove the trailing forward slash - otherwise the xquery won't work
742        if targetCollection.endswith('/'):
743            targetCollection = targetCollection[0:-1]
744        xquery = self.xmldb.xq.actual('atom2DIF', targetCollection, \
745                                 providerID, atom.datasetID)
746
747        discovery_id,s = self.xmldb.executeQuery(xquery)
748        if s.has_key('hits') and not s['hits']:
749            raise Exception("XQuery produced no results - suggesting a problem with the query")
750        doc = self.xmldb.retrieve(discovery_id,0)
751       
752        # add various missing info
753        # get the organisation data for the repository
754        # - NB, store for re-use to avoid multiple lookups of the same info
755        if self.dataCentres.has_key(providerID):
756            dataCentre = self.dataCentres[providerID]
757        else:
758            dataCentre = getDataCentreDIFElement(providerID, self.xmldb)
759            self.dataCentres[providerID] = dataCentre
760
761        # add various other data to atoms - to make up with the incomplete data spec there
762        doc = addOrgData(doc, targetCollection, dataCentre)
763        doc = expandParametersData(doc)
764        doc = addStandardKeywords(doc)
765
766        logging.info("Transform completed successfully - returning DIF doc")
767        return doc
768
769
770    def createDIFDocumentFromAtom(self, atom):
771        '''
772        Transform an atom into a DIF document and store this in eXist
773        @param atom: the Atom data model to convert
774        '''
775        logging.info("Creating and storing DIF document")
776        doc = self.transformAtomIntoDIF(atom)
777        fileName = atom.atomName.replace('.atom', '.xml')
778       
779        # now add to eXist
780        providerID = atom.ME.providerID
781        collectionPath = ec.DIF_COLLECTION_PATH + providerID
782       
783        # NB, check if a top level provider collection exists; if not, add
784        # this with a feed
785        # NB, the collection will be created along with the DIF file - so set the
786        # feed up after the file has been created
787        setupFeed = False
788        if self.isNewEXistCollection(collectionPath):
789            setupFeed = True
790
791        self.createOrUpdateEXistFile(doc, collectionPath,
792                                     fileName)       
793        if setupFeed:
794            logging.info("Creating feed for new provider ID")
795            self.createCollections([collectionPath])
796            self.feedClient.createAtomFeed(collectionPath,
797                                           self.feedClient.PROVIDERLEVEL_DIF_FEED_TITLE %providerID + '/')
798        logging.info("DIF document created and stored")
799
Note: See TracBrowser for help on using the repository browser.