1 | ''' |
---|
2 | Class supporting set up and communication with eXist DB |
---|
3 | for the purposes of creating and updating atoms |
---|
4 | |
---|
5 | @author: C Byrom - Tessella 08 |
---|
6 | ''' |
---|
7 | import os, sys, logging, datetime, uuid, re |
---|
8 | from ndg.common.src.models.Atom import Atom |
---|
9 | from ndg.common.src.models import AtomState |
---|
10 | from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec |
---|
11 | from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient |
---|
12 | from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR |
---|
13 | from ndg.common.src.lib.atomutilities import * |
---|
14 | from ndg.common.src.lib.existinitialiser import EXistInitialiser |
---|
15 | from xml.etree import ElementTree as ET |
---|
16 | from threading import Thread |
---|
17 | |
---|
18 | class DuplicateError(Exception): |
---|
19 | """ |
---|
20 | Exception handling for when a duplicated atom doc is discovered |
---|
21 | """ |
---|
22 | def __init__(self, msg): |
---|
23 | logging.error(msg) |
---|
24 | Exception.__init__(self, msg) |
---|
25 | |
---|
26 | class backingUpThread(Thread): |
---|
27 | |
---|
28 | def __init__ (self, existClient, doc, collection, fileName): |
---|
29 | logging.info("Setting up thread to run backup for file, '%s'" %fileName) |
---|
30 | Thread.__init__(self) |
---|
31 | self.ec = existClient |
---|
32 | self.doc = doc |
---|
33 | self.collection = collection |
---|
34 | self.fileName = fileName |
---|
35 | logging.info("- finished setting up thread") |
---|
36 | |
---|
37 | def run(self): |
---|
38 | logging.info("Running thread to perform backup of file, '%s'" %self.fileName) |
---|
39 | self.ec.createEXistFile(self.doc, self.collection, self.fileName) |
---|
40 | logging.info("- finished backing up file") |
---|
41 | |
---|
42 | class publishingThread(Thread): |
---|
43 | |
---|
44 | def __init__ (self, edc, atom): |
---|
45 | logging.info("Setting up thread to publish atom data for atom, '%s'" %atom.datasetID) |
---|
46 | Thread.__init__(self) |
---|
47 | self.edc = edc |
---|
48 | self.atom = atom |
---|
49 | logging.info("- finished setting up thread") |
---|
50 | |
---|
51 | def run(self): |
---|
52 | logging.info("Running thread to publish atom data for atom, '%s'" %self.atom.datasetID) |
---|
53 | self.edc._eXistDBClient__publishAtom(self.atom) |
---|
54 | logging.info("- finished publishing atom data") |
---|
55 | |
---|
56 | |
---|
57 | class eXistDBClient: |
---|
58 | |
---|
59 | def __init__(self, configFile = None, eXistDBHostname = None, \ |
---|
60 | eXistPortNo = '8080', \ |
---|
61 | loadCollectionData=False, setUpDB = False): |
---|
62 | ''' |
---|
63 | Initialise a connection to the eXistDB |
---|
64 | @keyword configFile: config file to use in setting up DB |
---|
65 | @keyword existDBHostname: name of eXist DB to use - if not specified, the first |
---|
66 | host in the config file is used |
---|
67 | @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults |
---|
68 | to '8080' if not set |
---|
69 | @keyword loadCollectionData: preload info on all the eXist collections, if True (default False) |
---|
70 | @keyword setUpDB: if True, create the basic collection structure and ingest the |
---|
71 | atom schemas. Default is False. |
---|
72 | ''' |
---|
73 | logging.info("Initialising connection to eXist DB") |
---|
74 | self.eXistDBHostname = eXistDBHostname |
---|
75 | self.eXistPortNo = eXistPortNo |
---|
76 | logging.debug("- connecting to DB, '%s:%s', with config file, '%s'" \ |
---|
77 | %(eXistDBHostname or 'Default', self.eXistPortNo, \ |
---|
78 | configFile or 'Default')) |
---|
79 | |
---|
80 | inputs = {} |
---|
81 | |
---|
82 | self.atomSchema = None |
---|
83 | |
---|
84 | # this keeps a record of the last file backed up - incase we need |
---|
85 | # to do a rollback if there's an error encountered |
---|
86 | self.backupName = None |
---|
87 | |
---|
88 | # if we preload the collections data, it'll be stored here |
---|
89 | self.collections = None |
---|
90 | |
---|
91 | # NB, there are two routes through here: if a config file is specified |
---|
92 | # without a hostname, the host will be taken to be the first entry in |
---|
93 | # the config file; if a hostname is specified, it will be used explicitly |
---|
94 | if configFile: |
---|
95 | inputs['pwfile'] = configFile |
---|
96 | if not self.eXistDBHostname: |
---|
97 | self.__loadDBDetails(configFile) |
---|
98 | |
---|
99 | # Now set up the connection |
---|
100 | logging.debug(inputs) |
---|
101 | self.xmldb = DR(self.eXistDBHostname, **inputs) |
---|
102 | |
---|
103 | # set up feed client, too - NB, info should be added to the feed as it is added |
---|
104 | # to the eXist collections, so the standard dbclient and the feed client |
---|
105 | # are intrinsicly linked |
---|
106 | self.feedClient = feedClient(self.xmldb.auth, |
---|
107 | eXistDBHostname = self.eXistDBHostname, |
---|
108 | eXistPortNo = self.eXistPortNo) |
---|
109 | |
---|
110 | if setUpDB: |
---|
111 | # set up the eXist DB appropriately for use with ndgCommon |
---|
112 | initialiser = EXistInitialiser(self) |
---|
113 | initialiser.initialise() |
---|
114 | |
---|
115 | if loadCollectionData: |
---|
116 | self.collections = self.getAllAtomCollections() |
---|
117 | |
---|
118 | # dict to hold the data centre element info - used to augment atom info |
---|
119 | # when doing atom->DIF translations - to avoid having to constantly |
---|
120 | # look this up for the different provider IDs |
---|
121 | self.dataCentres = {} |
---|
122 | |
---|
123 | logging.info("eXist DB connection initialised") |
---|
124 | |
---|
125 | |
---|
126 | def __getSchema(self): |
---|
127 | logging.debug("Getting atom schema data") |
---|
128 | if not self.atomSchema: |
---|
129 | self.atomSchema = ec.SCHEMATA_COLLECTION_PATH + '/' + \ |
---|
130 | self.xmldb.xq.ATOM_MOLES_SCHEMA + '.xsd' |
---|
131 | |
---|
132 | return self.atomSchema |
---|
133 | |
---|
134 | AtomSchema = property(fget=__getSchema, doc="Atom schema path") |
---|
135 | |
---|
136 | |
---|
137 | def createCollections(self, collections): |
---|
138 | ''' |
---|
139 | Create the specified collections in eXist |
---|
140 | @param collections: array of collections to create |
---|
141 | @return True if successful |
---|
142 | ''' |
---|
143 | logging.info("Setting up eXist collections") |
---|
144 | for col in collections: |
---|
145 | logging.debug("Creating collection, '%s'" %col) |
---|
146 | self.xmldb.createCollection(col) |
---|
147 | logging.info("All collections set up") |
---|
148 | |
---|
149 | |
---|
150 | def getAtom(self, id): |
---|
151 | ''' |
---|
152 | Lookup the atom with id |
---|
153 | @param id: id of the atom to retrieve |
---|
154 | ''' |
---|
155 | logging.info("Looking up atom with id, '%s'" %(id)) |
---|
156 | doc = self.xmldb.get('', DR.ATOM, id, \ |
---|
157 | targetCollection = ec.BASE_COLLECTION_PATH) |
---|
158 | logging.info("Atom retrieved") |
---|
159 | return doc |
---|
160 | |
---|
161 | |
---|
162 | def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False): |
---|
163 | ''' |
---|
164 | Validate the specified atom in eXist with the atom schemata in eXist |
---|
165 | @param atomPath: path to the atom in eXist |
---|
166 | @keyword atom: if set to an atom, this will be created temporarily in eXist |
---|
167 | - since it may not already exist there. Once validation is completed, the |
---|
168 | file will be removed from eXist. |
---|
169 | @keyword isDebug: if True, return full error details, otherwise only return |
---|
170 | a summary |
---|
171 | @return array: containing any errors found - NB, if an empty array is returned, |
---|
172 | this indicates successful validation |
---|
173 | ''' |
---|
174 | logging.info("Validating atom, '%s' against schemata in eXist" %atomPath) |
---|
175 | try: |
---|
176 | if atom: |
---|
177 | logging.info("Creating temporary file in eXist to do validation against") |
---|
178 | fileName = atom.datasetID + str(datetime.datetime.today().microsecond) |
---|
179 | self.createEXistFile(atom.toPrettyXML(), \ |
---|
180 | atom.getDefaultCollectionPath(), fileName) |
---|
181 | atomPath = atom.getDefaultCollectionPath() + fileName |
---|
182 | |
---|
183 | validationQuery = 'validation:validate-report("' + atomPath + \ |
---|
184 | '", xs:anyURI("' + self.AtomSchema + '"))' |
---|
185 | id, result = self.xmldb.executeQuery(validationQuery) |
---|
186 | errorMessage = None |
---|
187 | if result['hits'] == 0: |
---|
188 | errorMessage = "Validation did not complete successfully - please retry" |
---|
189 | elif result['hits'] > 1: |
---|
190 | errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry" |
---|
191 | except Exception, e: |
---|
192 | errorMessage = "Error encountered whilst validating atom: '%s'" %e.message |
---|
193 | |
---|
194 | if atom: |
---|
195 | logging.info("Deleting temporary file in eXist") |
---|
196 | self.deleteEXistFile(atomPath) |
---|
197 | |
---|
198 | if errorMessage: |
---|
199 | logging.error(errorMessage) |
---|
200 | raise SystemError(errorMessage) |
---|
201 | |
---|
202 | doc = self.xmldb.retrieve(id, 0) |
---|
203 | et = ET.fromstring(doc) |
---|
204 | status = et.findtext('status') |
---|
205 | |
---|
206 | # retrieve the error detail if invalid |
---|
207 | errors = [] |
---|
208 | if status == 'invalid': |
---|
209 | logging.info("Atom is invalid - details as follows:") |
---|
210 | for error in et.findall('message'): |
---|
211 | lineNo = error.attrib.get('line') |
---|
212 | colNo = error.attrib.get('column') |
---|
213 | level = error.attrib.get('level') |
---|
214 | repeat = error.attrib.get('repeat') |
---|
215 | errorText = error.text |
---|
216 | # remove the meaningless error type from message |
---|
217 | if errorText.startswith('cvc-'): |
---|
218 | errorText = ':'.join(errorText.split(':')[1:]) |
---|
219 | errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText) |
---|
220 | if repeat: |
---|
221 | errorMessage += " (%s times)" %repeat |
---|
222 | |
---|
223 | if isDebug: |
---|
224 | errors.append(errorMessage) |
---|
225 | else: |
---|
226 | errors.append(errorText) |
---|
227 | logging.info(errorMessage) |
---|
228 | else: |
---|
229 | logging.info("Atom is valid") |
---|
230 | |
---|
231 | logging.info("Validation complete") |
---|
232 | return errors |
---|
233 | |
---|
234 | |
---|
235 | |
---|
236 | def __loadDBDetails(self, configFile): |
---|
237 | ''' |
---|
238 | Retrieve info from the eXist db config file |
---|
239 | ''' |
---|
240 | logging.info("Loading DB config data") |
---|
241 | # Check this file exists |
---|
242 | if not os.path.isfile(configFile): |
---|
243 | errorMessage = "Could not find the DB config file, %s; please make sure this " \ |
---|
244 | "is available from the running directory" %configFile |
---|
245 | logging.error(errorMessage) |
---|
246 | raise ValueError(errorMessage) |
---|
247 | dbinfo_file=open(configFile, "r") |
---|
248 | dbinfo = dbinfo_file.read().split() |
---|
249 | if len(dbinfo) < 3: |
---|
250 | errorMessage = 'Incorrect data in DB config file' |
---|
251 | logging.error(errorMessage) |
---|
252 | raise ValueError(errorMessage) |
---|
253 | self.eXistDBHostname = dbinfo[0] |
---|
254 | self._username = dbinfo[1] |
---|
255 | self._pw = dbinfo[2] |
---|
256 | logging.info("DB config data loaded") |
---|
257 | |
---|
258 | |
---|
259 | def __lookupEXistFile(self, docPath): |
---|
260 | ''' |
---|
261 | Look up a file in eXist using XPath |
---|
262 | @param docPath: path to doc to look up |
---|
263 | @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None |
---|
264 | ''' |
---|
265 | logging.info("Retrieving info for file, '%s'" %docPath) |
---|
266 | |
---|
267 | id, doc = self.xmldb.executeQuery('doc("' + docPath + '")') |
---|
268 | |
---|
269 | if doc['hits'] == 0: |
---|
270 | logging.info("File does not exist in eXist DB") |
---|
271 | return None |
---|
272 | logging.info("Found file - returning result ID") |
---|
273 | return id |
---|
274 | |
---|
275 | |
---|
276 | def isNewEXistCollection(self, collectionPath): |
---|
277 | ''' |
---|
278 | Look up a collection in eXist using XPath |
---|
279 | @param collectionPath: path to collection to look up |
---|
280 | @return: False if collection exists, True otherwise |
---|
281 | ''' |
---|
282 | logging.info("Checking if collection, '%s', exists in eXist" %collectionPath) |
---|
283 | |
---|
284 | id, doc = self.xmldb.executeQuery('collection("' + collectionPath + '")') |
---|
285 | if doc['hits'] == 0: |
---|
286 | logging.info("Collection does not exist in eXist DB") |
---|
287 | return True |
---|
288 | logging.info("Found collection") |
---|
289 | return False |
---|
290 | |
---|
291 | |
---|
292 | def getEXistFile(self, docPath): |
---|
293 | ''' |
---|
294 | Use XQuery to retrieve the specified document from eXist |
---|
295 | @param docPath: the path of the doc to retrieve |
---|
296 | @return: contents of document if exists, None otherwise |
---|
297 | ''' |
---|
298 | id = self.__lookupEXistFile(docPath) |
---|
299 | |
---|
300 | if not id and id != 0: |
---|
301 | logging.info("No file found - nothing to retrieve") |
---|
302 | return None |
---|
303 | |
---|
304 | logging.info("Found file - now retrieving content") |
---|
305 | doc = self.xmldb.retrieve(id, 0) |
---|
306 | return doc |
---|
307 | |
---|
308 | |
---|
309 | def isNewEXistFile(self, docPath): |
---|
310 | ''' |
---|
311 | Test if a file already exists in eXist |
---|
312 | @param docPath: path of file in eXist to look up |
---|
313 | @return: True if a new file, False if otherwise |
---|
314 | ''' |
---|
315 | logging.info("Checking if file, '%s', exists in eXist DB" %docPath) |
---|
316 | |
---|
317 | id = self.__lookupEXistFile(docPath) |
---|
318 | |
---|
319 | if id: |
---|
320 | return False |
---|
321 | |
---|
322 | return True |
---|
323 | |
---|
324 | |
---|
325 | def __addTimeStamp(self, fileName): |
---|
326 | ''' |
---|
327 | Add timestamp to input filename |
---|
328 | NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp |
---|
329 | is included before this; if not it is just added at the end |
---|
330 | ''' |
---|
331 | bits = fileName.rsplit(".", 1) |
---|
332 | fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S") |
---|
333 | |
---|
334 | if len(bits) > 1: |
---|
335 | fileName += "." + bits[1] |
---|
336 | return fileName |
---|
337 | |
---|
338 | |
---|
339 | def __removeTimeStamp(self, fileName): |
---|
340 | ''' |
---|
341 | Remove a timestamp from a file name |
---|
342 | ''' |
---|
343 | match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName) |
---|
344 | if match: |
---|
345 | return match.group(1) + match.group(3) |
---|
346 | |
---|
347 | return fileName |
---|
348 | |
---|
349 | def backupEXistFile(self, collection, fileName, runAsynch = True): |
---|
350 | ''' |
---|
351 | Backup a file that exists in the eXist DB |
---|
352 | - NB, this really just creates a new file with the same contents in a |
---|
353 | backup dir |
---|
354 | - to improve efficiency, spawn this process as a new thread since we |
---|
355 | don't need to worry about the outcome |
---|
356 | @param collection: path of the collection to store the file in |
---|
357 | @param fileName: name of file to add in eXist |
---|
358 | @param runAsynch: if True, do the backup asynchronously in a separate thread |
---|
359 | @return: path to new backup file |
---|
360 | ''' |
---|
361 | if not collection.endswith('/'): |
---|
362 | collection += '/' |
---|
363 | |
---|
364 | docPath = collection + fileName |
---|
365 | logging.info("Backing up file, '%s', in eXist DB" %docPath) |
---|
366 | |
---|
367 | logging.debug("Firstly, retrieve file contents from eXist") |
---|
368 | doc = self.getEXistFile(docPath) |
---|
369 | if not doc: |
---|
370 | errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath |
---|
371 | logging.error(errorMessage) |
---|
372 | raise SystemError(errorMessage) |
---|
373 | |
---|
374 | # Now adjust the collection to map to the backup dir |
---|
375 | collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH) |
---|
376 | collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP) |
---|
377 | |
---|
378 | # add timestamp to filename |
---|
379 | fileName = self.__addTimeStamp(fileName) |
---|
380 | docPath = collection + fileName |
---|
381 | |
---|
382 | if runAsynch: |
---|
383 | # run the back up in a separate thread |
---|
384 | thread = backingUpThread(self, doc, collection, fileName) |
---|
385 | thread.start() |
---|
386 | else: |
---|
387 | self.createEXistFile(doc, collection, fileName) |
---|
388 | |
---|
389 | return docPath |
---|
390 | |
---|
391 | |
---|
392 | def createEXistFile(self, xml, collection, fileName): |
---|
393 | ''' |
---|
394 | Add the input file to the eXist DB |
---|
395 | @param xml: contents of xml file to create in eXist |
---|
396 | @param collection: path of the collection to store the file in |
---|
397 | @param fileName: name of file to add in eXist |
---|
398 | @return: True, if file created successfully |
---|
399 | ''' |
---|
400 | logging.info("Adding file, '%s' to eXist DB collection, '%s'" \ |
---|
401 | %(fileName, collection)) |
---|
402 | logging.debug("data: %s" %xml) |
---|
403 | |
---|
404 | # create the collection, if it doesn't already exist - NB, this won't overwrite anything |
---|
405 | if self.collections is None or not self.collections.get(collection): |
---|
406 | self.createCollections([collection]) |
---|
407 | |
---|
408 | status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1) |
---|
409 | if not status: |
---|
410 | errorMessage = "Command to create file in eXist did not complete successfully - exiting" |
---|
411 | logging.error(errorMessage) |
---|
412 | raise SystemError(errorMessage) |
---|
413 | |
---|
414 | logging.info("File added to eXist") |
---|
415 | return True |
---|
416 | |
---|
417 | |
---|
418 | def deleteEXistFile(self, docPath): |
---|
419 | ''' |
---|
420 | Delete the input file from eXist DB |
---|
421 | @param docPath: path of document to delete |
---|
422 | @return: True, if file deleted successfully |
---|
423 | ''' |
---|
424 | logging.info("Deleting file, '%s', from eXist DB" %docPath) |
---|
425 | |
---|
426 | status = self.xmldb.removeDoc(docPath) |
---|
427 | if not status: |
---|
428 | errorMessage = "Command to delete file in eXist did not complete successfully - exiting" |
---|
429 | logging.error(errorMessage) |
---|
430 | raise SystemError(errorMessage) |
---|
431 | |
---|
432 | logging.info("File deleted from eXist") |
---|
433 | return True |
---|
434 | |
---|
435 | |
---|
436 | def createOrUpdateEXistFile(self, xml, collection, fileName): |
---|
437 | ''' |
---|
438 | Check if a file already exists in eXist; if it does, run an |
---|
439 | update (which will backup the existing file), otherwise create |
---|
440 | the file in eXist |
---|
441 | @param xml: contents of xml file to create/update in eXist |
---|
442 | @param collection: path of the collection to store the file in |
---|
443 | @param fileName: name of file to add in eXist |
---|
444 | ''' |
---|
445 | logging.info("Creating or updating file in eXist...") |
---|
446 | if not self.isNewEXistFile(collection + fileName): |
---|
447 | self.backupEXistFile(collection, fileName) |
---|
448 | |
---|
449 | self.createEXistFile(xml, collection, fileName) |
---|
450 | |
---|
451 | |
---|
452 | def getAllAtomIDs(self): |
---|
453 | ''' |
---|
454 | Retrieve all the atom IDs in the atoms directory - NB, this can |
---|
455 | be a quick way of producing a cache of data to check - e.g. to avoid |
---|
456 | multiple calls to getAtomFileCollectionPath |
---|
457 | @return: ids - array of all atom IDs |
---|
458 | ''' |
---|
459 | logging.info("Retrieving all atom ids") |
---|
460 | xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '') |
---|
461 | id, doc = self.xmldb.executeQuery(xq) |
---|
462 | if doc['hits'] == 0: |
---|
463 | return [] |
---|
464 | |
---|
465 | indices = range(doc['hits']) |
---|
466 | |
---|
467 | doc = self.xmldb.retrieve(id, 0) |
---|
468 | et = ET.fromstring(doc) |
---|
469 | ids = [] |
---|
470 | for member in et: |
---|
471 | fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID') |
---|
472 | ids.append(fn) |
---|
473 | logging.debug("Found ids, '%s'" %ids) |
---|
474 | return ids |
---|
475 | |
---|
476 | |
---|
477 | def getAllAtomCollections(self): |
---|
478 | ''' |
---|
479 | Get all atom collection paths and store in a dictionary - for easy |
---|
480 | reference when doing lots of things at once |
---|
481 | @return: dict with key/val of atomID/collectionPath |
---|
482 | ''' |
---|
483 | logging.info("Retrieving all atom collection paths") |
---|
484 | xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '') |
---|
485 | id, doc = self.xmldb.executeQuery(xq) |
---|
486 | if doc['hits'] == 0: |
---|
487 | return [] |
---|
488 | |
---|
489 | indices = range(doc['hits']) |
---|
490 | |
---|
491 | doc = self.xmldb.retrieve(id, 0) |
---|
492 | et = ET.fromstring(doc) |
---|
493 | colData = {} |
---|
494 | for member in et: |
---|
495 | collection = member.findtext('{http://www.w3.org/2005/Atom}fileName') |
---|
496 | fileName = collection.split('/')[-1] |
---|
497 | fileName = fileName.split('.')[0] |
---|
498 | dir = '/'.join(collection.split('/')[0:-1]) |
---|
499 | colData[fileName] = dir |
---|
500 | |
---|
501 | logging.debug("Finished looking up atom paths") |
---|
502 | return colData |
---|
503 | |
---|
504 | |
---|
505 | def getAtomPublicationState(self, atomID): |
---|
506 | ''' |
---|
507 | Retrieve the publication state of the specified atom - by |
---|
508 | checking the collection it is in |
---|
509 | @param atom: atom id to look up |
---|
510 | @return: AtomState for the atom. NB, if the ID is not found, assume |
---|
511 | we're dealing with a new atom and set the state as the working state |
---|
512 | ''' |
---|
513 | logging.debug("Finding atom publication state") |
---|
514 | path = self.getAtomFileCollectionPath(atomID) |
---|
515 | for state in AtomState.allStates.values(): |
---|
516 | if path.find('/%s' %state.collectionPath) > -1: |
---|
517 | logging.debug("- state found: '%s'" %state.title) |
---|
518 | return state |
---|
519 | |
---|
520 | logging.debug("- state not found - returning WORKING state") |
---|
521 | return AtomState.WORKING_STATE |
---|
522 | |
---|
523 | |
---|
524 | def getAtomFileCollectionPath(self, atomID): |
---|
525 | ''' |
---|
526 | Given an atom id, determine and return the collection path in eXist |
---|
527 | of the associated atom file |
---|
528 | @param atom: atom id to look up |
---|
529 | @return: collection path, if it exists, None, otherwise |
---|
530 | ''' |
---|
531 | logging.info("Looking up collection path for atom ID, '%s'" %atomID) |
---|
532 | xq = self.xmldb.xq['atomFullPath'] |
---|
533 | xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH) |
---|
534 | xq = xq.replace('LocalID', atomID) |
---|
535 | |
---|
536 | id, doc = self.xmldb.executeQuery(xq) |
---|
537 | if doc['hits'] == 0: |
---|
538 | logging.info("No document found with the specified ID") |
---|
539 | return None |
---|
540 | |
---|
541 | doc = self.xmldb.retrieve(id,0,{}) |
---|
542 | |
---|
543 | docET = ET.fromstring(doc) |
---|
544 | collPath = docET.text + '/' |
---|
545 | logging.debug("Found collection path, '%s'" %collPath) |
---|
546 | return collPath |
---|
547 | |
---|
548 | |
---|
549 | def deleteAtomInExist(self, atom): |
---|
550 | ''' |
---|
551 | Delete the given atom from the eXist DB - using the atom |
---|
552 | details to work out the required path to delete |
---|
553 | ''' |
---|
554 | logging.info("Deleting atom from eXist") |
---|
555 | atomPath = atom.getDefaultCollectionPath() + atom.atomName |
---|
556 | self.deleteEXistFile(atomPath) |
---|
557 | logging.info("Atom deleted") |
---|
558 | |
---|
559 | |
---|
560 | def changeAtomPublicationStateInExist(self, atom, newState): |
---|
561 | ''' |
---|
562 | Adjust the publication state of an atom in eXist |
---|
563 | @param atom: the Atom data model of the atom whose publication state |
---|
564 | needs to change |
---|
565 | @param newState: an AtomState object representing the new publication |
---|
566 | state of the atom |
---|
567 | @return atom: atom data model with updated state |
---|
568 | ''' |
---|
569 | logging.info("Changing the publication state of atom - from '%s' to '%s'" \ |
---|
570 | %(atom.state.title, newState.title)) |
---|
571 | oldState = atom.state |
---|
572 | # firstly, create atom in new publication state collection - so data isn't |
---|
573 | # lost if this fails |
---|
574 | atom.state = newState |
---|
575 | self.createEXistFile(atom.toPrettyXML(), atom.getDefaultCollectionPath(), |
---|
576 | atom.atomName) |
---|
577 | |
---|
578 | # now delete atom in the old publication state |
---|
579 | atom.state = oldState |
---|
580 | self.deleteAtomInExist(atom) |
---|
581 | logging.info("- atom created in new publication state and removed from old one") |
---|
582 | atom.state = newState |
---|
583 | |
---|
584 | # update feeds + create DIFs, if needed |
---|
585 | if atom.isPublished(): |
---|
586 | self.runAsynchAtomPublish(atom) |
---|
587 | |
---|
588 | return atom |
---|
589 | |
---|
590 | |
---|
591 | |
---|
592 | def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True): |
---|
593 | ''' |
---|
594 | Create an atom in the eXist DB - using the atom contents to work out |
---|
595 | the location + data set ID |
---|
596 | @param atom: atom object to create in the DB |
---|
597 | @keyword replaceAtom: if False and the atom is already available in eXist |
---|
598 | @param runAsynch: if True, if a backup of an existing file, do this |
---|
599 | asynchronously in a separate thread + do the feed publishing and DIF |
---|
600 | creating in a separate thread, too |
---|
601 | then raise a ValueError. |
---|
602 | ''' |
---|
603 | logging.info("Creating atom in eXist") |
---|
604 | if not atom: |
---|
605 | raise ValueError("Input is not an object - cannot create in eXist") |
---|
606 | if not isinstance(atom, Atom): |
---|
607 | raise ValueError("Input object is not an Atom object - cannot create in eXist") |
---|
608 | |
---|
609 | # if the atom has no dataset ID, generate and add one |
---|
610 | # NB, this should only be the case when the atom is being created |
---|
611 | # via the web interface |
---|
612 | isNew = False |
---|
613 | if not atom.datasetID: |
---|
614 | isNew = True |
---|
615 | atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1())) |
---|
616 | |
---|
617 | eXistCollection = None |
---|
618 | if self.collections is not None: # cope with empty dict |
---|
619 | eXistCollection = self.collections.get(atom.datasetID) |
---|
620 | else: |
---|
621 | eXistCollection = self.getAtomFileCollectionPath(atom.datasetID) |
---|
622 | |
---|
623 | # if collection not found, assume we're dealing with a new atom; get its |
---|
624 | # default collection |
---|
625 | if not eXistCollection: |
---|
626 | eXistCollection = atom.getDefaultCollectionPath() |
---|
627 | |
---|
628 | # check if we need a new provider feed set up |
---|
629 | providerCollection = ec.PROVIDER_FEED_PATH + atom.ME.providerID + '/' |
---|
630 | if self.isNewEXistCollection(providerCollection): |
---|
631 | logging.info("Creating feed for new provider ID") |
---|
632 | self.createCollections([providerCollection]) |
---|
633 | self.feedClient.createAtomFeed(providerCollection, |
---|
634 | self.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %atom.ME.providerID) |
---|
635 | |
---|
636 | elif isNew: |
---|
637 | # in this situation we're trying to create an atom with the same |
---|
638 | # name via the web interface - this can't be allowed - so retry to |
---|
639 | # generate a new ID |
---|
640 | atom.datasetID = None |
---|
641 | self.createAtomInExist(atom) |
---|
642 | return |
---|
643 | # create backup of atom if it already exists |
---|
644 | else: |
---|
645 | if not replaceAtom: |
---|
646 | raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \ |
---|
647 | %atom.datasetID) |
---|
648 | # store name of backup - to allow restore, if subsequent ops fail |
---|
649 | self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \ |
---|
650 | runAsynch = runAsynch) |
---|
651 | |
---|
652 | # also change updated date to current time |
---|
653 | atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ") |
---|
654 | |
---|
655 | self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName) |
---|
656 | |
---|
657 | logging.info("Atom created in eXist") |
---|
658 | |
---|
659 | # lastly, if we're dealing with a published atom, update DIF records + |
---|
660 | # feeds |
---|
661 | if atom.isPublished(): |
---|
662 | if runAsynch: |
---|
663 | self.runAsynchAtomPublish(atom) |
---|
664 | else: |
---|
665 | self.__publishAtom(atom) |
---|
666 | |
---|
667 | return atom |
---|
668 | |
---|
669 | def runAsynchAtomPublish(self, atom): |
---|
670 | thread = publishingThread(self, atom) |
---|
671 | thread.start() |
---|
672 | |
---|
673 | |
---|
674 | def __publishAtom(self, atom): |
---|
675 | ''' |
---|
676 | Add atom info to the various feeds - and if it is a data entity, use |
---|
677 | it to create a DIF document and add this to feeds, also |
---|
678 | ''' |
---|
679 | if atom.isDE(): |
---|
680 | self.createDIFDocumentFromAtom(atom) |
---|
681 | |
---|
682 | self.feedClient.addAtomToFeeds(atom) |
---|
683 | |
---|
684 | |
---|
685 | |
---|
686 | def restoreBackup(self, docPath): |
---|
687 | ''' |
---|
688 | Restore the backed up file - effectively recreating in the non-backup collection |
---|
689 | @param docPath: path to file to backup |
---|
690 | ''' |
---|
691 | logging.info("Restoring file, '%s' in eXist" %docPath) |
---|
692 | doc = self.getEXistFile(docPath) |
---|
693 | |
---|
694 | if not doc: |
---|
695 | errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath |
---|
696 | logging.error(errorMessage) |
---|
697 | raise SystemError(errorMessage) |
---|
698 | |
---|
699 | bits = docPath.split('/') |
---|
700 | fileName = bits[-1] |
---|
701 | collection = '/'.join(bits[0:-1]) |
---|
702 | # remove timestamp from filename |
---|
703 | fileName = self.__removeTimeStamp(fileName) |
---|
704 | |
---|
705 | # Now adjust the collection to map to the backup dir |
---|
706 | collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH) |
---|
707 | collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH) |
---|
708 | |
---|
709 | self.createEXistFile(doc, collection, fileName) |
---|
710 | logging.info("File restored") |
---|
711 | |
---|
712 | |
---|
713 | def transformAtomIntoDIF(self, atom): |
---|
714 | ''' |
---|
715 | Transform an atom into a DIF document - using an XQuery transform ran |
---|
716 | in eXist |
---|
717 | @param atom: the Atom data model to convert |
---|
718 | @return: the produced DIF document |
---|
719 | ''' |
---|
720 | logging.info("Creating DIF record from atom - using XQuery transform") |
---|
721 | |
---|
722 | # get the query and set this up to use properly |
---|
723 | targetCollection = atom.getPublicationStatePath() |
---|
724 | providerID = atom.ME.providerID |
---|
725 | # remove the trailing forward slash - otherwise the xquery won't work |
---|
726 | if targetCollection.endswith('/'): |
---|
727 | targetCollection = targetCollection[0:-1] |
---|
728 | xquery = self.xmldb.xq.actual('atom2DIF', targetCollection, \ |
---|
729 | providerID, atom.datasetID) |
---|
730 | |
---|
731 | discovery_id,s = self.xmldb.executeQuery(xquery) |
---|
732 | if s.has_key('hits') and not s['hits']: |
---|
733 | raise Exception("XQuery produced no results - suggesting a problem with the query") |
---|
734 | doc = self.xmldb.retrieve(discovery_id,0) |
---|
735 | |
---|
736 | # add various missing info |
---|
737 | # get the organisation data for the repository |
---|
738 | # - NB, store for re-use to avoid multiple lookups of the same info |
---|
739 | if self.dataCentres.has_key(providerID): |
---|
740 | dataCentre = self.dataCentres[providerID] |
---|
741 | else: |
---|
742 | dataCentre = getDataCentreDIFElement(providerID, self.xmldb) |
---|
743 | self.dataCentres[providerID] = dataCentre |
---|
744 | |
---|
745 | # add various other data to atoms - to make up with the incomplete data spec there |
---|
746 | doc = addOrgData(doc, targetCollection, dataCentre) |
---|
747 | doc = expandParametersData(doc) |
---|
748 | doc = addStandardKeywords(doc) |
---|
749 | |
---|
750 | logging.info("Transform completed successfully - returning DIF doc") |
---|
751 | return doc |
---|
752 | |
---|
753 | |
---|
754 | def createDIFDocumentFromAtom(self, atom): |
---|
755 | ''' |
---|
756 | Transform an atom into a DIF document and store this in eXist |
---|
757 | @param atom: the Atom data model to convert |
---|
758 | ''' |
---|
759 | logging.info("Creating and storing DIF document") |
---|
760 | doc = self.transformAtomIntoDIF(atom) |
---|
761 | fileName = atom.atomName.replace('.atom', '.dif') |
---|
762 | |
---|
763 | # now add to eXist |
---|
764 | providerID = atom.ME.providerID |
---|
765 | collectionPath = ec.DIF_COLLECTION_PATH + providerID |
---|
766 | # NB, check if provider collection exists; if not, add this with a feed |
---|
767 | # NB, the collection will be created along with the DIF file - so set the |
---|
768 | # feed up after the file has been created |
---|
769 | setupFeed = False |
---|
770 | if self.isNewEXistCollection(collectionPath): |
---|
771 | setupFeed = True |
---|
772 | |
---|
773 | self.createOrUpdateEXistFile(doc, collectionPath, |
---|
774 | fileName) |
---|
775 | if setupFeed: |
---|
776 | logging.info("Creating feed for new provider ID") |
---|
777 | self.createCollections([collectionPath]) |
---|
778 | self.feedClient.createAtomFeed(collectionPath, |
---|
779 | self.feedClient.PROVIDERLEVEL_DIF_FEED_TITLE %providerID + '/') |
---|
780 | logging.info("DIF document created and stored") |
---|
781 | |
---|