Ignore:
Timestamp:
20/01/09 16:33:33 (11 years ago)
Author:
cbyrom
Message:

Add new ingest script - to allow ingest of DIF docs from eXist hosted
atom feed. NB, this required restructure of original OAI harvester
to allow re-use of shared code - by abstracting this out into new class,
absstractdocumentingester.

Add new documentation and tidy up codebase removing dependencies where possible to simplify things.

Location:
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch
Files:
3 added
6 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py

    r4224 r4854  
    55''' 
    66import sys, os, logging 
    7 import db_funcs 
    87from SpatioTemporalData import * 
     8from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc 
    99 
    1010class PostgresDAO: 
    1111     
    12     def __init__(self, record, connection=None): 
     12    def __init__(self, record, pgClient = None): 
    1313        ''' 
    1414        Constructor - to initialise the DAL and do some initial setting up 
    15         @param record: the PostgresRecord object to add or update in the DB  
     15        @param record: the PostgresRecord object to add or update in the DB 
     16        @keyword pgClient: a postgresClient object to use to connect to postgres 
     17        - NB, if not set, will try to set one up by looking for a pg.config file 
     18        locally  
    1619        ''' 
    1720        if record == "": 
     
    2124 
    2225        # setup a connection to the db - if none specified 
    23         if connection is None: 
    24             connection = db_funcs.db_connect() 
    25         self._connection = connection 
     26        if pgClient is None: 
     27            self.pgc = pgc(configFile, pgc.DEFAULT_CONFIG_FILE) 
     28        else: 
     29            self.pgc = pgClient 
     30 
    2631        self._record = record 
    2732         
     
    4045        sql = "SELECT original_document_id FROM ORIGINAL_DOCUMENT where discovery_id = '" + \ 
    4146            self._record.discovery_id + "';" 
    42         dbId = db_funcs.runSQLCommand(self._connection, sql) 
     47        dbId = self.pgc.runSQLCommand(sql) 
    4348        if dbId: 
    4449            self._record.db_id = dbId[0][0] 
     
    113118        sql = "SELECT harvest_count, scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \ 
    114119            str(self._record.db_id) + " AND original_document = '" + self._record.originalFormat + "';" 
    115         results = db_funcs.runSQLCommand(self._connection, sql) 
     120        results = self.pgc.runSQLCommand(sql) 
    116121 
    117122        # NB, if the document is not identical, the sql command will not find anything 
     
    121126            sql = "SELECT scn FROM ORIGINAL_DOCUMENT where original_document_id = " + \ 
    122127                    str(self._record.db_id) + ";" 
    123             results = db_funcs.runSQLCommand(self._connection, sql) 
     128            results = self.pgc.runSQLCommand(sql) 
    124129            self._record.scn = results[0][0] 
    125130            return True 
     
    135140        sql = "UPDATE ORIGINAL_DOCUMENT SET harvest_count = " + str(count) + \ 
    136141            " WHERE original_document_id = " + str(self._record.db_id) 
    137         db_funcs.runSQLCommand(self._connection, sql) 
     142        self.pgc.runSQLCommand(sql) 
    138143        return False 
    139144 
     
    149154            str(self._record.db_id) + ";"       
    150155 
    151         db_funcs.runSQLCommand(self._connection, sqlCmd) 
     156        self.pgc.runSQLCommand(sqlCmd) 
    152157        logging.info("Spatiotemporal data deleted successfully") 
    153158 
     
    164169        sqlCmd = "SELECT add_spatiotemporal_row('" + str(self._record.db_id) + "', '" + \ 
    165170            str(coords.north) + "', '" + str(coords.south) + "', '" + str(coords.west) + "', '" + \ 
    166             str(coords.east) + "', " 
    167              
    168              
    169          
    170         # cope with null dates appropriately 
    171         if timeRange.start == "null": 
    172             sqlCmd += timeRange.start + ", " 
    173         else: 
    174             sqlCmd += "'" + timeRange.start + "', " 
    175              
    176         if timeRange.end == "null": 
    177             sqlCmd += timeRange.end 
    178         else: 
    179             sqlCmd += "'" + timeRange.end + "'" 
    180          
    181         sqlCmd += ");"       
    182  
    183         db_funcs.runSQLCommand(self._connection, sqlCmd) 
     171            str(coords.east) + "', '" + timeRange.start + "', '" + timeRange.end + "');" 
     172             
     173        # fix any null strings 
     174        sqlCmd = sqlCmd.replace("'null'", "null") 
     175 
     176        self.pgc.runSQLCommand(sqlCmd) 
    184177        logging.info("Spatiotemporal row added successfully") 
    185178         
     
    243236            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "');"  
    244237 
    245         id = db_funcs.runSQLCommand(self._connection, sqlCmd) 
     238        id = self.pgc.runSQLCommand(sqlCmd) 
    246239        if len(id) == 0: 
    247240            raise ValueError, 'No PK ID returned from INSERT to DB' 
     
    258251        sqlCmd = "SELECT delete_document('" + str(self._record.db_id) + "');"  
    259252 
    260         db_funcs.runSQLCommand(self._connection, sqlCmd) 
     253        self.pgc.runSQLCommand(sqlCmd) 
    261254        logging.info("Original document deleted from Postgres DB") 
    262255         
     
    272265            self._record.originalFormat + "', '" + self._record.getAuthorsInfo() + "', '" + \ 
    273266            self._record.getParametersInfo() + "', '" + self._record.getScopeInfo() + "', '" + str(self._record.scn) + "');" 
    274         db_funcs.runSQLCommand(self._connection, sqlCmd) 
     267        self.pgc.runSQLCommand(sqlCmd) 
    275268         
    276269        # increment scn - saves doing an additional db call 
     
    295288                docType + "', '" + doc + "', current_timestamp, 1);" 
    296289             
    297             db_funcs.runSQLCommand(self._connection, sqlCmd) 
     290            self.pgc.runSQLCommand(sqlCmd) 
    298291         
    299292        logging.info("Transformed records created in DB") 
     
    314307                str(self._record.db_id) + " AND transformed_format = '" + docType + "';" 
    315308 
    316             db_funcs.runSQLCommand(self._connection, sqlCmd) 
     309            self.pgc.runSQLCommand(sqlCmd) 
    317310     
    318311        logging.info("Transformed records updated in DB") 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py

    r4664 r4854  
    44C Byrom Apr 08 
    55''' 
    6 try: #python 2.5 
    7     from xml.etree import cElementTree 
    8 except ImportError: 
    9     try: 
    10         # if you've installed it yourself it comes this way 
    11         import cElementTree 
    12     except ImportError: 
    13         # if you've egged it this is the way it comes 
    14         from ndgUtils.elementtree import cElementTree 
    15  
     6from xml.etree import cElementTree 
    167import os, sys, logging, re 
    17 import csml.csml2Moles.molesReadWrite as MRW 
    18 from ndgUtils.ndgObject import ndgObject 
    19 import ndgUtils.lib.fileutilities as FileUtilities 
     8#import csml.csml2Moles.molesReadWrite as MRW 
     9from ndg.common.src.models.ndgObject import ndgObject 
     10import ndg.common.src.lib.fileutilities as FileUtilities 
    2011from SpatioTemporalData import SpatioTemporalData 
    2112import keywordAdder 
     
    6455        tmp = filename.split('/') 
    6556        self._dir = '/'.join(tmp[0:len(tmp)-1]) 
    66         self.shortFilename = tmp[len(tmp)-1] 
     57        self.shortFilename = tmp[-1] 
    6758         
    6859        # dir to store a temp copy of the moles file, when produced - for use by other transforms 
     
    153144            raise SystemError, 'Cannot parse the XML moles document %s. Detail:\n%s' %(molesFile, detail) 
    154145 
    155              
    156146 
    157147    def doTransform(self, xQueryType): 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/README.txt

    r3998 r4854  
    22------------ 
    33 
    4 The oai_document_ingester script is used to ingest documents obtained by the OAI harvester  
    5 into a postgres database.  At the point of ingest, documents are adjusted to avoid namespace issues 
    6 and are renamed to include their discovery ids.  Following the ingest of a document, this is then 
    7 transformed, using XQueries ran using the java saxon library, to the generic Moles format - and  
    8 then transformed from this into all the other available document formats  
    9 (currently DIF, DC, IS19139 - NB currently the MDIP transform does not work - see ToDo).   
    10 All the transformed docs are stored in the DB and spatiotemporal data is also extracted from the  
    11 moles files and stored. 
    12  
    13 If the script processes all files successfully, the harvest directory is backed up locally and then  
    14 cleared.  
    15  
    16 The script can be ran standalone, by specifying a datacentre to ingest files from, or the wrapper script 
     4Two ingester scripts now exist to ingest documents from a data centre into the discovery 
     5postgres DB.  The original script, oai_document_ingester retrieved docs harvested by the 
     6OAI service.  A new script, feeddocumentingester, has been added to harvest documents 
     7direct from an eXist repository running a feed reporting service - to alert the system 
     8to the publication of new documents to harvest. 
     9 
     10The two scripts share some common functionality, so to allow code re-use, this has been 
     11abstracted out into a parent class, AbstractDocumentIngester.  More detail on the two 
     12scripts is provided below - together with their shared functionality - which effectively 
     13is the point from which a newly harvested document has been discovered. 
     14 
     15oai_document_ingester 
     16------------------------------   
     17 
     18The oai_document_ingester script ingests documents obtained by the OAI harvester. 
     19 
     20Usage: python oai_document_ingester.py [OPTION] <datacentre>" 
     21 - where:\n   <datacentre> is the data centre to ingest data from; and options are:" 
     22 -v - verbose mode for output logging" 
     23 -d - debug mode for output logging" 
     24 
     25 
     26This script can be ran standalone, by specifying a datacentre to ingest files from, or the wrapper script 
    1727run_all_ingest.py can be used; this will parse the contents of the datacentre_config directory and 
    1828run the oai_document_ingester against every config file located there. 
    1929 
    20 The script can also be ran using two options: 
    21  
    22         -v - 'verbose' mode - prints out logging of level INFO and above 
    23         -d - 'debug' mode - prints out all logging 
    24          
    25 NB, the default level is WARNING and above or, if ran via the run_all_ingest script, INFO and above. 
    26  
    27 Details for the postgis database are stored in the ingest.txt file; this should be set to access 
     30 
     31feeddocumentingester 
     32------------------------------ 
     33 
     34The feed ingester polls a feed set up on an eXist DB which hosts records to harvest.  This 
     35feed should be updated when a new record to harvest is created - and when the ingester 
     36polls this feed again, it will try to retrieve and ingest the new document. 
     37 
     38Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]" 
     39              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]" 
     40 - where:\n <feed> is the atom feed to ingest data from; options are:" 
     41 -v - verbose mode for output logging" 
     42 -d - debug mode for output logging" 
     43 and keywords are:" 
     44 interval - interval, in seconds, at which to retrieve data from the feed" 
     45 ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time" 
     46 eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'" 
     47 eXistPortNo - port number used by the eXist DB - defaults to '8080'" 
     48 dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested" 
     49 
     50NB, the feed URL will typically be pointing at the RESTful interface to an eXist DB which is hosing the feed. 
     51The format for this is: 
     52 
     53http://eXistHostAndPort/exist/atom/content/<db-collection-for-feed> - e.g.: 
     54 
     55http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF 
     56 
     57The client expects this feed to have entries with a content element with a src element that features 
     58a valid ndgURL - e.g.: 
     59 
     60<feed xmlns="http://www.w3.org/2005/Atom"> 
     61    <id>urn:uuid:28a477f6-df52-4bb4-9c30-4ef5e3d5c615</id> 
     62    <updated>2009-01-20T10:33:02+00:00</updated>          
     63    <title>DIF Data</title> 
     64    <link href="#" rel="edit" type="application/atom+xml"/> 
     65    <link href="#" rel="self" type="application/atom+xml"/> 
     66    <entry> 
     67        <id>urn:uuid:fe496d7f-bdd9-409e-b3b1-2af05a2d0f33</id> 
     68                <updated>2009-01-19T17:22:27+00:00</updated> 
     69                <published>2009-01-19T17:22:27+00:00</published> 
     70        <link href="?id=urn:uuid:fe496d7f-bdd9-409e-b3b1-2af05a2d0f33" rel="edit" type="application/atom+xml"/> 
     71                <title>DIF Record [CRYOspheric STudies of Atmospheric Trends (CRYOSTAT)]</title> 
     72        <summary> 
     73        The CRYOspheric STudies of Atmospheric Trends in stratospherically and radiatively important gases (CRYOSTAT) will undertake the first combined measurements of virtually all significant Greenhouse gases (GHGs). GHGs (other than water vapour), ozone-depleting substances (ODSs), and related trace gases in contiguous firn and ice profiles, spanning as much as 200 years, from both the northern and southern polar ice caps. CRYOSTAT is an evolution of the FIRETRACC/100 project, the data from which is also held at BADC. 
     74                </summary> 
     75        <content src="http://badc.nerc.ac.uk:50001/view/badc.nerc.ac.uk__BROWSE-DIF__dataent_cryostat" type="application/atom+xml"/> 
     76    </entry> 
     77</feed> 
     78 
     79- NB, the ndgUrl in this case is badc.nerc.ac.uk__BROWSE-DIF__dataent_cryostat 
     80 
     81 
     82Shared functionality 
     83------------------------------ 
     84 
     85Once a new document to ingest has been discovered (i.e. either via the OAI harvester or 
     86extracted directly from an eXist repository on prompting from a feed update), the following 
     87workflow is completed: 
     88 
     891. document contents are adjusted to avoid namespace issues 
     902. documents are renamed to include their discovery ids 
     913. documents are then transformed, using XQueries ran using the java saxon library, to  
     92the generic Moles (1.3) format - and then transformed from this into all the other  
     93available document formats (currently DIF, DC, IS19139 - NB currently the MDIP  
     94transform does not work - see ToDo (IS THIS STILL TRUE?).   
     954. All transformed docs are stored in the DB and spatiotemporal data is also  
     96extracted from the moles files and stored. 
     975. If the script processes all files successfully, the harvest directory is backed up locally and then  
     98cleared.  
     99 
     100NB, the difference between the oai and feed ingesters here is that, for the oai ingester, 
     101the above workflow is completed on all docs ingested from the oai, whereas for the feed 
     102ingester, it is completed for each document retrieved from the feed, in turn. 
     103 
     104 
     105Details for the postgres database are stored in the ingest.txt file; this should be set to access 
    28106permission 0600 and owned by the user running the script - to ensure maximum security of the file. 
    29107 
     
    41119 
    42120 
    43 Error Handling 
     121Error Handling - OAI harvester 
    44122----------------- 
    45123 
     
    68146-------------------------- 
    69147Docs are transformed using the Saxon java library - running the transform direct from the command line.  The transforms use the 
    70 XQueries and XQuery libs provided by the ndgUtils egg - so this must be installed before the script is used.  NB, the various XQuery 
     148XQueries and XQuery libs provided by the ndg.common.src egg - so this must be installed before the script is used.  NB, the various XQuery 
    71149libs that are referenced by the XQueries and extracted locally and the references in the scripts adjusted to point to these copies 
    72150- to ensure the current libs are used when running the scripts. 
     
    88166To Do 
    89167------------- 
    90  
    91 1. There are a number of outstanding dependencies on the ingest scripts; these should either be 
    92 removed entirely or the script should be properly packaged as an egg, with references to these dependencies 
    93 from other eggs.  The main dependencies stem from the use of the following files: 
    94  
    95 DIF.py 
    96 MDIP.py 
    97 molesReadWrite.py 
    98  
    99 - these require the following files: 
    100  
    101 AccessControl.py 
    102 ETxmlView.py 
    103 geoUtilities.py 
    104 People.py 
    105 renderEntity.py 
    106 renderService.py 
    107 SchemeNameSpace.py 
    108 Utilities.py 
    109168 
    1101692. Whilst testing the scripts, it was noted that the various MDIP transforms do not currently work 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/Utilities.py

    r4258 r4854  
    1 from collections import deque # python 2.4 
    2 try: #python 2.5 
    3     from xml.etree import ElementTree as ET 
    4 except: 
    5     #ActivePython-2.4                      #SEL 08/01/2007 
    6     import elementtree.ElementTree as ET   #SEL 08/01/2007 
    7          
    8 from ndgUtils.ETxmlView import * 
    9 import ConfigParser 
    10 import os 
    11 import re 
    12 import urllib 
     1from xml.etree import ElementTree as ET 
    132import logging 
    14  
    15 __NOCONFIG='Missing Config File' 
    16  
    17 class myConfig: 
    18      
    19    ''' Handle missing sections and variables in a config file a bit gracefully. Also 
    20    instantiates a logger if necessary ''' 
    21     
    22    def __init__(self,configfile,logName='NDGLOG'): 
    23        self.config=ConfigParser.ConfigParser() 
    24        if not os.path.exists(configfile): raise __NOCONFIG, configfile 
    25        self.config.read(configfile) 
    26        logfile=self.get('logging','debugLog',None) 
    27        self.logfile=None #deprecated 
    28        self.logger=None 
    29        if logfile is not None: 
    30            logger=logging.getLogger(logName) 
    31            handler=logging.FileHandler(logfile) 
    32            formatter=logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    33            handler.setFormatter(formatter) 
    34            logger.addHandler(handler) 
    35            logger.setLevel(logging.INFO) 
    36            self.logger=logger 
    37          
    38    def get(self,section,key,default=None): 
    39        ''' Return a config file value for key frm section ''' 
    40        try: 
    41            return self.config.get(section,key) 
    42        except: 
    43            return default 
    44          
    45    def log(self,string): 
    46        ''' Log some debug information ''' 
    47        if self.logger is None: return 
    48        if string is not None: 
    49           self.logger.info(string) 
    50        else: 
    51           self.logger.info('empty log entry') 
    52            
    53    def getLog(self): 
    54        return self.logger 
    55  
    56 class RingBuffer(deque): 
    57     #deque is a python 2.4 class! 
    58     #credit http://www.onlamp.com/pub/a/python/excerpt/pythonckbk_chap1/index1.html 
    59     def __init__(self, size_max): 
    60         deque.__init__(self) 
    61         self.size_max = size_max 
    62     def append(self, datum): 
    63         deque.append(self, datum) 
    64         if len(self) > self.size_max: 
    65             self.popleft( ) 
    66     def tolist(self): 
    67         return list(self) 
    68  
    69 def wrapGetText(element,xpathExpression,multiple=0): 
    70         ''' Wraps a call to ET to get a text object in an error handler ''' 
    71         def none2txt(i): 
    72             if i is None: return '' 
    73             return i 
    74         if element is None: 
    75             if multiple: 
    76                  return ['',] 
    77             else: return '' 
    78         if multiple: 
    79                 r=element.findall(xpathExpression) 
    80         else: 
    81                 r=[element.find(xpathExpression),] 
    82         try: 
    83                 rr=[i.text for i in r] 
    84         except: 
    85                 rr=['',] 
    86         rr=map(none2txt,rr)  
    87         if multiple:  
    88                 return rr 
    89         else: return rr[0]  
    90  
    91 def getURLdict(cgiFieldStorage): 
    92     ''' takes a cgi field storage object and converts it to a dictionary ''' 
    93     result={} 
    94     for item in cgiFieldStorage: 
    95             result[item]=cgiFieldStorage[item].value 
    96     return result 
    97 ## 
    98 ### convert the followign two methods into one class that can handle 
    99 ## xml directly too if necessary 
    100 ## 
    101 def DIFid2NDGid(string): 
    102     ''' takes a dif thing parses it and produces an ET ndg element id ... 
    103     and use this in dif ... ''' 
    104     s=string.split(':') 
    105     try: 
    106         r='''<DIFid><schemeIdentifier>%s</schemeIdentifier> 
    107          <repositoryIdentifier>%s</repositoryIdentifier> 
    108          <localIdentifier>%s</localIdentifier></DIFid>'''%(s[1],s[0],s[2]) 
    109         return ET.fromstring(r) 
    110     except: 
    111         r='''<DIFid><schemeIdentifier>DIF</schemeIdentifier> 
    112         <repositoryIdentifier>Unknown</repositoryIdentifier> 
    113         <localIdentifier>%s</localIdentifier></DIFid>'''%string 
    114         return ET.fromstring(r) 
    115  
    116 def EnumerateString(string): 
    117     ''' Takes a string, and if it's got a number on the end, increments it, 
    118     otherwise adds a number on the end, used to differentiate strings which 
    119     would otherwise be identical ''' 
    120     def addNum(matchObj): 
    121         s=matchObj.group() 
    122         return str(int(s)+1) 
    123     r=re.sub('\d+$',addNum,string) 
    124     if r==string: r=r+'1' 
    125     return r 
    126  
    127 def dateParse(string,instruction): 
    128     ''' Simple date manipulations on a string, if it is understood ...  
    129        if instruction = YYYY, return the year ''' 
    130     s=string.split('-') 
    131     if instruction=='YYYY': 
    132         if len(s)==3: # expecting year,mon,day or day,mon,year ...  
    133             if int(s[0])>int(s[2]):  
    134                 return s[0] 
    135             else: 
    136                 return s[2] 
    137         else: 
    138             return string # unknown format as yet ... 
    139     else: 
    140         return 'unknown instruction to dateParse %s'%instruction 
     3from ndg.common.src.lib.ETxmlView import loadET, nsdumb 
    1414 
    1425def idget(xml,dataType='DIF'): 
     
    15821    """ Tests as required """ 
    15922 
    160     configFile='examples/example.config' 
    161     difFile='examples/neodc.eg1.dif' 
    162      
    163     def setUp(self): 
    164         # If pkg_resources is available assume the module is eggified and 
    165         # get a stream to the input data from the egg. 
    166         #try: 
    167         #    import pkg_resources 
    168         #    f = pkg_resources.resource_stream(__name__, self.configFile) 
    169         #except ImportError: 
    170             # Else take the input file from __file__ 
    171             #import os 
    172         self.config=myConfig(self.configFile) 
    173         f=file(self.difFile,'r') 
    174         self.difxml=f.read() 
    175             #f=file(os.path.join(os.path.basepath(__file__), self.configFile)) 
    176  
    177         #self.config=myConfig(f) 
    178  
    179     def testConfig(self): 
    180         print 'Discovery Icon [%s]'%self.config.get('DISCOVERY','icon') 
    181          
    18223    def testidget(self): 
    18324        self.assertEqual(idget(self.difxml),'NOCSDAT192') 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/keywordAdder.py

    r4257 r4854  
    33adds Structured Keywords to moles documents 
    44''' 
    5 try: #python 2.5 
    6     from xml.etree import ElementTree as etree 
    7 except ImportError: 
    8     try: 
    9         # if you've installed it yourself it comes this way 
    10         import elementtree.ElementTree as etree 
    11 #        import ElementTree as etree 
    12     except ImportError: 
    13         # if you've egged it this is the way it comes 
    14         from ndgUtils.elementtree import ElementTree as etree 
    15  
    16 try: #python 2.5 
    17     from xml.etree import cElementTree 
    18 except ImportError: 
    19     try: 
    20         # if you've installed it yourself it comes this way 
    21         import cElementTree 
    22     except ImportError: 
    23         # if you've egged it this is the way it comes 
    24         from ndgUtils.elementtree import cElementTree 
     5from xml.etree import ElementTree as etree 
     6from xml.etree import cElementTree 
    257import csml.csml2Moles.molesReadWrite as MRW 
    268import sys, os, logging 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py

    r4714 r4854  
    66 As well as doing the ingest, a backup directory is created to store the created moles files. 
    77''' 
    8 import os, sys, string, getopt, logging 
     8import os, sys, logging 
    99from time import strftime 
    10 from SchemaNameSpace import SchemaNameSpace 
    11 from ndgUtils.ndgXqueries import ndgXqueries 
    12 import ndgUtils.lib.fileutilities as FileUtilities 
    13 from PostgresRecord import PostgresRecord 
    14 from PostgresDAO import PostgresDAO 
    15 from Utilities import idget 
    16 import db_funcs 
     10import ndg.common.src.lib.fileutilities as FileUtilities 
     11from abstractdocumentingester import AbstractDocumentIngester 
    1712 
    18 class oai_document_ingester(object): 
     13class oai_document_ingester(AbstractDocumentIngester): 
    1914        ''' 
    2015        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB 
     
    2318        ''' 
    2419 
    25         def getID(self, filename): 
    26                 ''' 
    27                 Gets the identifier out of an input metadata xml record.  
    28                 @param filename - name of document file being processed 
    29                 @return: ID - id to use to refer to the document 
    30                 ''' 
    31                 logging.info("Retrieving identifier for metadata record " + filename) 
    32                 xml=file(filename).read() 
    33                 ID = idget(xml) 
    34                 return ID 
    35          
    36          
    37         def addFileToPostgresDB(self, filename): 
    38                 ''' 
    39                 Add a file to the postgres DB - extracting and storing all the required 
    40                 data in the process 
    41                 @param filename: full path of file to add to postgres DB  
    42                 ''' 
    43                 if not os.path.isfile(filename): 
    44                         logging.info("Skipping, %s - not a valid file" %filename) 
    45                         return 
     20        def processDataCentre(self, dataCentre): 
    4621                 
    47                 logging.info("Adding file, " + filename + ", to postgres DB") 
    48                  
    49                 # first of all create a PostgresRecord - this object represents all the data required 
    50                 # for a DB entry 
    51                 dao = None 
    52                 try: 
    53                         discoveryID = self.getID(filename) 
    54                         record = PostgresRecord(filename, self._NDG_dataProvider, \ 
    55                                                             self._datacentre_groups, self._datacentre_namespace, \ 
    56                                                             discoveryID, self._xq, self._datacentre_format) 
    57          
    58                         # Now create the data access object to interface to the DB 
    59                         dao = PostgresDAO(record, self._dbConnection) 
    60                  
    61                         # Finally, write the new record 
    62                         if dao.createOrUpdateRecord(): 
    63                                 self._no_files_ingested += 1 
    64                 except: 
    65                         logging.error("Exception thrown - detail: ") 
    66                         logging.error(sys.exc_info()) 
    67                          
    68                         if dao: 
    69                                 logging.info("Removing record and its associated info from DB") 
    70                                 logging.info("- to allow clean ingestion on rerun") 
    71                                 try: 
    72                                         dao.deleteOriginalRecord() 
    73                                 except: 
    74                                         logging.error("Problem encountered when removing record: ") 
    75                                         logging.error(sys.exc_info()) 
    76                                         logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested") 
    77  
    78                         self._no_problem_files += 1 
    79                         logging.info("Continue processing other files") 
    80          
    81          
    82         def getConfigDetails(self, datacentre): 
    83                 ''' 
    84                 Get the harvested records directory and groups for this datacentre from the  
    85                 datacentre specific config file.  The harvested records directory depends on the  
    86                 datacentres OAI base url, the set and format. These have to be know up-front. 
    87                 The groups denote which 'portal groups' they belong to - for limiting searches to  
    88                 say NERC-only datacentres records. 
    89                 Groups are added to the intermediate MOLES when it is created. 
    90                 @param datacentre: datacentre to use when looking up config file  
    91                 ''' 
    92                 self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties" 
    93                 logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename) 
    94                  
    95                 # Check this file exists; if not, assume an invalid datacentre has been specified 
    96                 if not os.path.isfile(self._datacentre_config_filename): 
    97                     sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ 
    98                         "specified (%s) is invalid\n" %datacentre) 
    99                      
    100                 datacentre_config_file = open(self._datacentre_config_filename, "r") 
    101                  
    102                 for line in datacentre_config_file.readlines(): 
    103                     words  = string.split(line) 
    104                     if len(words) == 0: 
    105                         continue 
    106                     if words[0] == 'host_path': 
    107                         self._harvest_home = string.rstrip(words[1]) 
    108                     if words[0] == 'groups': 
    109                         self._datacentre_groups = words[1:] 
    110                     if words[0] == 'format': 
    111                         self._datacentre_format = words[1] 
    112                     if words[0] == 'namespace': 
    113                         self._datacentre_namespace = words[1] 
    114                     if words[0] == 'NDG_dataProvider': 
    115                         self._NDG_dataProvider = True 
    116                  
    117                 datacentre_config_file.close() 
    118                  
    119                 if self._harvest_home == "": 
    120                     sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename) 
    121                  
    122                 logging.info("harvested records are in " + self._harvest_home) 
    123                  
    124                 if self._datacentre_groups == "": 
    125                     logging.info("No groups/keywords set for datacentre " + datacentre) 
    126                 else: 
    127                     logging.info("datacentre groups/keywords: " + str(self._datacentre_groups)) 
    128                  
    129                 if self._datacentre_format == "": 
    130                     sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename) 
    131                  
    132                 logging.info("format being harvested: " + self._datacentre_format) 
    133                  
    134                 if self._datacentre_namespace == "": 
    135                     sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename) 
    136                  
    137                 logging.info("datacentre namespace: " + self._datacentre_namespace) 
    138                  
    139                 if self._NDG_dataProvider: 
    140                         logging.info("Datacentre classified as an NDG data provider") 
    141                 else: 
    142                         logging.info("Datacentre is not classified as an NDG data provider") 
    143                 logging.info(self.lineSeparator) 
    144  
    145                  
    146         def _getDBConnection(self): 
    147                 ''' 
    148                 Get the default DB connection - by reading in data from the db config file 
    149                 ''' 
    150                 logging.info("Setting up connection to postgres DB") 
    151                 dbinfo_file=open('ingest.config', "r") 
    152                 dbinfo = dbinfo_file.read().split() 
    153                 if len(dbinfo) < 4: 
    154                         raise ValueError, 'Incorrect data in config file' 
    155                  
    156                 # if port specified in file, use this, otherwise use default 
    157                 if len(dbinfo) > 4: 
    158                         self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4]) 
    159                 else: 
    160                         self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 
    161                 logging.info("Postgres DB connection now set up") 
    162  
    163          
    164         def usage(self): 
    165                 ''' 
    166                 Display input params for the script 
    167                 ''' 
    168                 print "Usage: python oai_document_ingester.py [OPTION] <datacentre>" 
    169                 print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:" 
    170                 print " -v - verbose mode for output logging" 
    171                 print " -d - debug mode for output logging" 
    172                 sys.exit(2) 
    173  
    174                  
    175         def __init__(self, datacentre=None): 
    176                 ''' 
    177                 Main entry point for script 
    178                 ''' 
    179                 self.lineSeparator = "-----------------------------" 
    180                 print self.lineSeparator 
    181                 print "RUNNING: oai_document_ingester.py" 
    182                  
    183                 # check for verbose option 
    184                 try: 
    185                     opts, args = getopt.getopt(sys.argv[1:], "vd") 
    186                 except getopt.GetoptError, err: 
    187                     # print help information and exit: 
    188                     print str(err) # will print something like "option -a not recognized" 
    189                      
    190                 loggingLevel = logging.WARNING 
    191                 for o, a in opts: 
    192                     if o == "-v": 
    193                         print " - Verbose mode ON" 
    194                         loggingLevel = logging.INFO 
    195                     elif o == "-d": 
    196                         print " - Debug mode ON" 
    197                         loggingLevel = logging.DEBUG 
    198                  
    199                 print self.lineSeparator 
    200                 logging.basicConfig(level=loggingLevel, 
    201                                             format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') 
    202                  
    203                 if datacentre is None: 
    204                         self.usage() 
    205                  
    206                 numfilesproc = 0 
    20722                self._no_files_ingested = 0 
    20823                self._no_problem_files = 0 
     24                self.dataCentre = dataCentre 
     25                 
    20926                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from 
    210                          
    211                 data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs 
     27                self._setupDataCentreDirs() 
    21228                 
    21329                #Change os directory to that with the harvested documents in it. 
     
    21733                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') 
    21834                 
    219                 # set the global variables to retrieve from the config file 
    220                 self._harvest_home = "" 
    221                 self._datacentre_groups = "" 
    222                 self._datacentre_format = "" 
    223                 self._datacentre_namespace = "" 
    224                 self._NDG_dataProvider = False 
    225                 self.getConfigDetails(datacentre) 
     35                self.getConfigDetails(dataCentre) 
    22636                 
    22737                # check harvest dir exists and that there are any records to harvest? 
    22838                if not os.path.exists(self._harvest_home): 
    229                         logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \ 
    230                                                  %(datacentre, self._harvest_home)) 
     39                        logging.info("Harvest directory for dataCentre %s (%s) could not be found - exiting" \ 
     40                                                 %(dataCentre, self._harvest_home)) 
    23141                        return 
    23242                elif len(os.listdir(self._harvest_home)) == 0: 
    233                         logging.info("Nothing to harvest this time from %s" %datacentre) 
     43                        logging.info("Nothing to harvest this time from %s" %dataCentre) 
    23444                        return 
    235                  
    236                 # The directory to put things for a tape backup (should already exist) 
    237                 backupdir = '/disks/glue1/oaiBackup/' 
    238                  
    239                 # the following dirs define where the specific documents should go 
    240                 originals_dir = data_dir + "/oai/originals/" 
    241                 discovery_dir = data_dir + "/discovery/" 
    242                  
    243                 # Create/clear the 'in' directory pristine copy of the discovery records 
    244                 FileUtilities.setUpDir(originals_dir) 
    245                 commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir 
     45 
     46                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir 
    24647                logging.info("Executing : " + commandline) 
    24748                status = os.system(commandline) 
     
    25051                    sys.exit("Failed at making pristine copy stage") 
    25152                 
    252                 # Create/clear the directory for the 'out' processed copy of the discovery records. 
    253                 FileUtilities.setUpDir(discovery_dir) 
    254                      
    255                 #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) 
    256                 # - also replace any namespace declarations with a standard one which we know works in NDG 
    257                 # NB, this copies files from the original dir to the discovery dir 
    258                 logging.info(self.lineSeparator) 
    259                 logging.info("Renaming files:") 
    260                 for filename in os.listdir(originals_dir): 
    261                         if filename.endswith('.xml'): 
    262                                 original_filename = originals_dir + filename 
    263                                 try: 
    264                                         ident=self.getID(original_filename) 
    265                                 except Exception, detail: 
    266                                         logging.error("Could not retrieve ID from file, %s" %filename) 
    267                                         logging.error("Detail: %s" %detail) 
    268                                         logging.info("Continue with next file") 
    269                                         continue 
    270                                  
    271                                 if self._NDG_dataProvider: 
    272                                         new_filename = discovery_dir + ident.replace(":","__")+".xml" 
    273                                 else: 
    274                                                 ident = ident.replace(":","-") 
    275                                                 ident = ident.replace("/","-") 
    276                                                 new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
    277                                                 logging.info("original file = " + original_filename) 
    278                                                 logging.info("newfile = " + new_filename) 
    279                                  
    280                                 # now correct any namespace issues 
    281                                 try: 
    282                                     SchemaNameSpace(original_filename, new_filename, self._datacentre_format) 
    283                                 except Exception, detail: 
    284                                         logging.error("SchemaNameSpace failed on file %s" %original_filename) 
    285                                         logging.error("Detail: %s" %detail) 
    286                                         logging.info("Continue with next file") 
    287                                         continue 
    288                                 numfilesproc += 1 
    289                         else: 
    290                                 logging.warning('File %s is not xml format. Not processed'  %(filename)) 
    291                  
    292                 logging.info(self.lineSeparator) 
    293                  
    294                 # now set up the required XQueries 
    295                 # - NB, extract the xquery libraries locally for easy reference 
    296                 self._xq=ndgXqueries() 
    297                 for libFile in self._xq.xqlib: 
    298                         FileUtilities.createFile(libFile, self._xq.xqlib[libFile]) 
    299                  
     53                self._setupXQueries() 
     54                                 
    30055                # Process the resulting files and put the data into the postgres DB 
    30156                # - firstly set up a db connection to use 
    302                 self._dbConnection = None 
    303                 self._getDBConnection() 
    304                  
    305                 filenames = os.listdir(discovery_dir) 
    306                 for filename in filenames: 
    307                         self.addFileToPostgresDB(discovery_dir + filename) 
    308                  
    309                 #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups 
    310                 backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") 
    311                  
    312                 this_backupdir = backupdir_base + "_originals/" 
    313                 FileUtilities.makeBackUp(originals_dir, this_backupdir) 
    314                  
    315                 #Clear out the original harvest records area and discovery dir 
    316                 FileUtilities.cleanDir(originals_dir) 
    317                 FileUtilities.cleanDir(discovery_dir) 
     57                self._getPostgresDBConnection() 
     58 
     59                self._convertAndIngestFiles(self.originals_dir, self.discovery_dir) 
    31860                 
    31961                logging.info("oai_document_ingest processing complete:") 
     
    33072                logging.info(self.lineSeparator) 
    33173                print "Script finished running." 
     74 
     75         
     76        def usage(self): 
     77                ''' 
     78                Display input params for the script 
     79                ''' 
     80                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>" 
     81                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:" 
     82                print " -v - verbose mode for output logging" 
     83                print " -d - debug mode for output logging" 
     84                sys.exit(2) 
    33285                 
    33386         
    33487if __name__=="__main__": 
    335         opts, args = getopt.getopt(sys.argv[1:], '-vd') 
    336         if len(args) < 1: 
    337                 oai_document_ingester() 
    338          
    339         oai_document_ingester(args[0]) 
     88        print "=================================" 
     89        print "RUNNING: oai_document_ingester.py" 
     90        ingester = oai_document_ingester() 
     91        args = ingester._setupCmdLineOptions() 
     92        ingester.processDataCentre(args[0]) 
Note: See TracChangeset for help on using the changeset viewer.