source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py @ 3862

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py@3862
Revision 3862, 17.4 KB checked in by cbyrom, 12 years ago (diff)

Fix workflow so that transforms are only ran when actually required

  • i.e. when the data is not already in the DB + fix a few small bugs

+ add extra logging + wrapper spatiotemporal data in record via get
methods.

Line 
1#!/usr/bin/env python
2'''
3Class representing the contents of a row in the metadata_record postgres DB table
4C Byrom Apr 08
5'''
6try: #python 2.5
7    from xml.etree import cElementTree
8except ImportError:
9    try:
10        # if you've installed it yourself it comes this way
11        import cElementTree
12    except ImportError:
13        # if you've egged it this is the way it comes
14        from ndgUtils.elementtree import cElementTree
15
16import os, sys, logging, re
17import molesReadWrite as MRW
18from ndgUtils.ndgObject import ndgObject
19from FileUtilities import FileUtilities
20from SpatioTemporalData import SpatioTemporalData
21
22class PostgresRecord:
23    '''
24    Class representing the contents of a row in the metadata_record postgres DB table
25    @param filename: Name of file to use a metadata record
26    @param
27    '''
28    documentTypes = ['MOLES', 'DIF', 'DC', 'ISO19139']#, 'MDIP']
29       
30    def __init__(self, filename, ndg_dataprovider, datacentre_groups, datacentre_namespace, discovery_id, xq, docType):
31        logging.info("Setting up Postgres record for file, " + filename)
32        self.filename = filename
33   
34        # NB, if we're dealing with an NDG data provider, the details are slightly different
35        if ndg_dataprovider:
36            discObj=ndgObject(discovery_id)
37            self._local_id = discObj.localID
38            self._repository_local_id = discObj.repository
39        else:
40            self._local_id = discovery_id
41            self._repository_local_id = datacentre_namespace
42           
43        self._datacentre_groups = datacentre_groups
44        self._repository = datacentre_namespace
45        self.discovery_id = discovery_id
46        self._xq = xq
47        self.docType = docType
48
49        self._molesFormat = None    # initialise this, so we can guarantee a value - to avoid using getattr
50        self._allDocs = []  # array to store all the transformed docs - for easy retrieval by the DAO
51
52        self._fileUtils = FileUtilities()
53
54        # get the dir of the file - needed by the xquery to use as the target collection
55        tmp = filename.split('/')
56        self._dir = '/'.join(tmp[0:len(tmp)-1])
57        self._shortFilename = tmp[len(tmp)-1]
58       
59        # dir to store a temp copy of the moles file, when produced - for use by other transforms
60        self._molesDir = None
61
62        # firstly load contents of file
63        self.originalFormat = file(filename).read()
64       
65        # escape any apostrophes
66        self.originalFormat = self.escapeSpecialCharacters(self.originalFormat)
67
68        # initialise the various record fields
69        self.db_id = None    # the DB ID of the record, for easy reference when it is created
70        self.molesFormat = None
71        self.dcFormat = None
72        self.mdipFormat = None
73        self.iso19139Format = None
74        self.scn = 1    # system change number - keeps track of number of mods to a particular row
75       
76        # spatiotemporal data object
77        self.stData = None
78
79    def escapeSpecialCharacters(self, inputString):
80        '''
81        Adjust the input string to escape any characters that would interfere with string or DB
82        operations
83        @param inputString: string to correct
84        @return: corrected string
85        '''
86        return re.sub(r'\'', '\\\'', inputString)
87   
88   
89    def doRecordTransforms(self):
90        '''
91        Run various transforms on the original doc, to populate the record with
92        the other types of doc used elsewhere
93        '''
94        logging.info("Running transforms for all document types")
95        for docType in self.documentTypes:
96            self.getDocumentFormat(docType)
97           
98        logging.info("Transforms complete")
99
100
101    def createMolesFile(self):
102        '''
103        Check if a moles file exists on the system; if not, assume the moles transform has not
104        been ran and then produce this file - to allow for use in the various xqueries
105        '''
106        logging.info("Creating moles file on system - for use with other xquery transforms")
107        self._molesDir = self._dir + "/moles/"
108        self._fileUtils.setUpDir(self._molesDir)
109       
110        if self._molesFormat is None:
111            self.doMolesTransform()
112           
113        self._fileUtils.createFile(self._molesDir + self._shortFilename, self._molesFormat)
114        logging.info("Moles file created - at %s" %self._molesDir)
115           
116
117    def doTransform(self, xQueryType):
118        '''
119        Transform the record according to the specified XQuery type
120        @param xQueryType: XQuery doc to use to do the transform
121        @return: the metadata record in the required transformed format
122        '''
123        logging.info("Running XQuery transform, " + xQueryType + " to create transformed document")
124
125        # firstly, check if this is a moles -> something else query; if so, ensure there is a valid
126        # moles file available for the transform - and use the correct dir for the xquery collection
127        dir = self._dir
128        if xQueryType.find('moles2') > -1:
129            if self._molesDir is None:
130                self.createMolesFile()
131               
132            dir = self._molesDir
133           
134        # get the query and set this up to use properly
135        xquery = self._xq.actual(xQueryType, dir, self._repository_local_id, self._local_id)
136
137        # sort out the input ID stuff
138        xquery=xquery.replace('Input_Entry_ID', self.discovery_id)
139        xquery=xquery.replace('repository_localid', self._repository)
140
141        # strip out the eXist reference to the libraries; these files should be available in the
142        # running dir - as set up by oai_ingest.py
143        xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Vocabs/', '')
144        xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Utilities/', '')
145
146        # write the query to file, to make it easier to input
147        # NB, running directly at the command line leads to problems with the interpretation of $ characters
148        xqFile = "currentQuery.xq"
149        self._fileUtils.createFile(xqFile, xquery)
150
151        # Now do the transform
152        os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.')
153        xqCommand = "java -cp ./lib/saxon9.jar net.sf.saxon.Query " + xqFile + " !omit-xml-declaration=yes"
154        logging.debug("Running saxon command: " + xqCommand)
155        pipe = os.popen(xqCommand + " 2>&1")
156        output = pipe.read()
157        status = pipe.close()
158
159        if status is not None:
160            raise SystemError, 'Failed at running the XQuery'
161
162        # now remove the temp xquery file
163        status = os.unlink(xqFile)
164        if status is not None:
165            raise OSError, 'Failed to remove the temporary xquery file, ' + xqFile
166       
167        logging.info("Transform completed successfully")
168
169        return output
170
171
172    def doMolesTransform(self):
173        '''
174        Set up the basic moles doc - according to the type of document first ingested
175        '''
176        logging.info("Creating moles document - for use with other transforms")
177        xqName = None
178        if self.docType == "DIF":
179            xqName = "dif2moles"
180        elif self.docType == "MDIP":
181            xqName = "mdip2moles"
182        else:
183            sys.exit("ERROR: No XQuery exists to transform input document type, %s, into moles format" \
184                     %self.docType)
185
186        # add keywords, if required
187        if self._datacentre_groups != "":
188            addKeywords()
189
190        # now run the appropriate transform and set the attribute
191        setattr(self, "_molesFormat", self.doTransform(xqName))
192       
193        # escape any apostrophes
194        self._molesFormat = self.escapeSpecialCharacters(self._molesFormat)
195
196        logging.info("moles document created")
197       
198
199    def addKeywords(self):
200        '''
201        If datacentre groups have been specified, these need to be added as keywords
202        - NB, this is rather clumsy approach but uses old code to achieve the result
203        '''
204        logging.info("Adding datacentre keywords to moles file")
205        # NB, use temporary directories to do the keyword additions
206        tmpDir = os.getcwd() + "/"
207        tmpKeywordsDir = os.getcwd() + "/kewordsAdded/"
208        self._fileUtils.setUpDir(tmpDir)
209        self._fileUtils.setUpDir(tmpKeywordsDir)
210        tmpFile = 'tmpFile.xml'
211        self._fileUtils.createFile(tmpDir + "/" + tmpFile, self._molesFormat)
212
213        keywordAdder.main(tmpDir, tmpKeywordsDir, self.datacentre_groups)
214
215        # Now load in the converted file
216        f=open(tmpKeywordsDir + "/" + tmpFile, 'r')
217        self._molesFormat = f.read()
218        f.close
219       
220        # Finally, tidy up temp dirs
221        self._fileUtils.cleanDir(tmpDir)
222        self._fileUtils.clearDir(tmpKeywordsDir)
223        logging.info("Completed adding keywords")
224       
225
226    def getDocumentFormat(self, docType):
227        '''
228        Lookup document format; if it is already defined then return it, else do the required XQuery
229        transform.  NB, transforms are ran on the molesFormat document - so ensure this is available
230        @param docType: format of document to return
231        '''
232        logging.info("Retrieving document type, " + docType)
233        xqName = {'DIF':'moles2dif', 'MOLES':'moles', 'DC':'moles2DC', 'MDIP':'moles2mdip', 'ISO19139':'moles2iso19139'}[docType]
234        attributeName = {'DIF':'_difFormat', 'MOLES':'_molesFormat', 'DC':'_dcFormat', 'MDIP':'_mdipFormat', 'ISO19139':'_iso19139Format'}[docType]
235       
236        # check we have the moles format available; if not create it
237        if self._molesFormat is None:
238            self.doMolesTransform()
239            self.createMolesFile()
240       
241        # check the document isn't already defined
242        try:
243            doc = getattr(self, attributeName)
244            if doc is not None:
245                logging.info("Found existing document - returning this now")
246                return doc
247        except:
248            logging.info("Document not available - creating new transformed document")
249
250        # the doc type doesn't exist - so run the xquery
251        transformedDoc = self.doTransform(xqName)
252        setattr(self, attributeName, transformedDoc)
253        return transformedDoc
254       
255   
256    def getAllDocs(self):
257        '''
258        Return a list of all the available doc types in the record
259        '''
260        # if the stored docs array is the same size as the array of all doc types
261        # assume all transforms have been done - and just return these
262        if len(self._allDocs) == len(self.documentTypes):
263            return self._allDocs
264       
265        for docType in self.documentTypes:
266            self._allDocs.append([docType, self.getDocumentFormat(docType)])
267
268        return self._allDocs
269       
270   
271    def getTemporalData(self):
272        '''
273        Retrieves the temporal data for the record; if this hasn't been discovered yet,
274        do the necessary parsing
275        @return: TimeRange object array with temporal data
276        '''
277        if self.stData is None:
278            self.getSpatioTemporalData()
279       
280        return self.stData.getTemporalData()
281       
282   
283    def getSpatialData(self):
284        '''
285        Retrieves the spatial data for the record; if this hasn't been discovered yet,
286        do the necessary parsing
287        @return: Coords object array with spatial data
288        '''
289        if self.stData is None:
290            self.getSpatioTemporalData()
291       
292        return self.stData.getSpatialData()
293       
294
295    def listify(self, item):
296        '''
297        listify checks if an item is a list, if it isn't it puts it
298        inside a list and returns it. Always returns a list object.
299        @param item: object to check
300        @return: item as a list object
301        '''
302        if type(item) is list:
303            return item
304        else:
305            return [item]
306       
307   
308    def getSpatioTemporalData(self):
309        '''
310        Extract spatio temporal data from the original document
311        '''
312        # initialise the various spatiotemporal arrays used to extract data to
313        self.stData = SpatioTemporalData()
314       
315        molesFile = self._molesDir + self._shortFilename
316        logging.info('Retrieving spatiotemporal info from moles file, %s' %molesFile)
317       
318        # load in the moles file and put this into an object for direct access to the xml elements
319        dgMeta=MRW.dgMetadata()
320        try:
321            dgMeta.fromXML(cElementTree.ElementTree(file=molesFile).getroot())
322        except Exception, detail:
323            raise SystemError, 'Cannot parse the XML moles document %s. Detail:\n%s' %(molesFile, detail)
324
325        # do quick checks to see if the relevant data exists
326        if not dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary:
327            logging.info("No data summary elements found - assuming no spatiotemporal data available")
328            return
329       
330        if not dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage:
331            logging.info("No data coverage elements found - assuming no spatiotemporal data available")
332            return
333       
334        if not dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgSpatialCoverage:
335            logging.info("No spatial coverage elements found - assuming no spatial data available")
336        else:
337            self.getCoordData(dgMeta)
338
339        if not dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgTemporalCoverage:
340            logging.info("No temporal coverage elements found - assuming no temporal data available")
341        else:
342            self.getTimeRangeData(dgMeta)
343
344
345    def getTimeRangeData(self, dgMeta):
346        '''
347        Parse an xml tree and add any time range data found
348        @param dgMeta: xml fragment for the time range
349        '''
350        logging.info("Extracting time range info")
351        try:
352            dates = dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgTemporalCoverage.DateRange
353           
354            if not dates:
355                logging.info("No temporal info found for document")
356               
357            dates_list = self.listify(dates)
358            for date in dates_list:
359                startdate=date.DateRangeStart
360                enddate= date.DateRangeEnd
361                if startdate==None or startdate=='None':
362                    startdate="null"
363                if enddate==None or enddate=='None':
364                    enddate="null"
365                   
366                self.stData.addTimeRange(startdate, enddate)
367                logging.info("Temporal info: startdate " + \
368                             startdate + ", enddate " + enddate) 
369        except Exception, detail:
370            logging.info("Document does not contain temporal info.")
371            logging.info(detail)
372
373       
374    def getCoordData(self, dgMeta):
375        '''
376        Parse an xml tree and add any coord data found
377        @param dgMeta: xml fragment for the bounding boxes
378        '''
379        logging.info("Extracting bounding box info")
380        try:
381
382            bboxes = dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgSpatialCoverage.BoundingBox
383           
384            if not bboxes:
385                logging.info("No bounding box info found for document")
386                return
387               
388            bbox_list=self.listify(bboxes)
389            #parse the list of coordinates
390            for bbox in bbox_list:
391                north = self.parseCoord(bbox.LimitNorth, 'S', 'N')
392                south = self.parseCoord(bbox.LimitSouth, 'S', 'N')
393                east = self.parseCoord(bbox.LimitEast, 'W', 'E')
394                west = self.parseCoord(bbox.LimitWest, 'W', 'E')
395                self.stData.addCoords(north, south, east, west)
396                logging.info("Spatial info: west= " + west + ",south " + south + ", east " + \
397                    east + ", north " + north + "")
398               
399        except Exception, detail:
400            logging.warning("Problem encountered whilst parsing bounding box info - this may lead \n" + \
401                            "to an incomplete set of metadata being ingested. \nDetail: %s" %detail)
402
403
404    def parseCoord(self, coordValue, minField, maxField):
405        '''
406        Take a coordinate value extracted from a molefile bbox limit - together with
407        the appropriate max/min limits and extract the correct value from it
408        @param coordValue: the contents of the bbox limit tage
409        @param minField: the expected min field of the coord range - i.e. 'W' or 'S'
410        @param maxField: the expected max field of the coord range - i.e. 'E' or 'N'
411        @return: coord - the value of the coordinate as a string   
412        '''
413        logging.debug("Parsing document coordinates")
414        try:
415            coord = coordValue.strip()
416            if coord.endswith(maxField):
417                coord=coordValue.split(maxField)[0]
418            elif coord.endswith(minField):
419                if coord.startswith('-'):
420                    coord = coordValue.split(minField)[0]
421                else:
422                    coord = "-" + coordValue.split(minField)[0]
423   
424            return '%s' % float(coord)
425        except:
426            raise SyntaxError, 'Will not process File: contains incorrect bounding box limit: ' + coordValue
427
428           
429    def hasNullCoords():
430        '''
431        Checks a record to determine whether it has any coordinates set to null
432        '''
433        if str(self.west)=='null' or \
434            str(self.south)=='null' or \
435            str(self.east)=='null' or \
436            str(self.north)=='null':
437            return True;
438        else:
439            return False;
440       
Note: See TracBrowser for help on using the repository browser.