Ignore:
Timestamp:
20/01/09 16:33:33 (11 years ago)
Author:
cbyrom
Message:

Add new ingest script - to allow ingest of DIF docs from eXist hosted
atom feed. NB, this required restructure of original OAI harvester
to allow re-use of shared code - by abstracting this out into new class,
absstractdocumentingester.

Add new documentation and tidy up codebase removing dependencies where possible to simplify things.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py

    r4714 r4854  
    66 As well as doing the ingest, a backup directory is created to store the created moles files. 
    77''' 
    8 import os, sys, string, getopt, logging 
     8import os, sys, logging 
    99from time import strftime 
    10 from SchemaNameSpace import SchemaNameSpace 
    11 from ndgUtils.ndgXqueries import ndgXqueries 
    12 import ndgUtils.lib.fileutilities as FileUtilities 
    13 from PostgresRecord import PostgresRecord 
    14 from PostgresDAO import PostgresDAO 
    15 from Utilities import idget 
    16 import db_funcs 
     10import ndg.common.src.lib.fileutilities as FileUtilities 
     11from abstractdocumentingester import AbstractDocumentIngester 
    1712 
    18 class oai_document_ingester(object): 
     13class oai_document_ingester(AbstractDocumentIngester): 
    1914        ''' 
    2015        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB 
     
    2318        ''' 
    2419 
    25         def getID(self, filename): 
    26                 ''' 
    27                 Gets the identifier out of an input metadata xml record.  
    28                 @param filename - name of document file being processed 
    29                 @return: ID - id to use to refer to the document 
    30                 ''' 
    31                 logging.info("Retrieving identifier for metadata record " + filename) 
    32                 xml=file(filename).read() 
    33                 ID = idget(xml) 
    34                 return ID 
    35          
    36          
    37         def addFileToPostgresDB(self, filename): 
    38                 ''' 
    39                 Add a file to the postgres DB - extracting and storing all the required 
    40                 data in the process 
    41                 @param filename: full path of file to add to postgres DB  
    42                 ''' 
    43                 if not os.path.isfile(filename): 
    44                         logging.info("Skipping, %s - not a valid file" %filename) 
    45                         return 
     20        def processDataCentre(self, dataCentre): 
    4621                 
    47                 logging.info("Adding file, " + filename + ", to postgres DB") 
    48                  
    49                 # first of all create a PostgresRecord - this object represents all the data required 
    50                 # for a DB entry 
    51                 dao = None 
    52                 try: 
    53                         discoveryID = self.getID(filename) 
    54                         record = PostgresRecord(filename, self._NDG_dataProvider, \ 
    55                                                             self._datacentre_groups, self._datacentre_namespace, \ 
    56                                                             discoveryID, self._xq, self._datacentre_format) 
    57          
    58                         # Now create the data access object to interface to the DB 
    59                         dao = PostgresDAO(record, self._dbConnection) 
    60                  
    61                         # Finally, write the new record 
    62                         if dao.createOrUpdateRecord(): 
    63                                 self._no_files_ingested += 1 
    64                 except: 
    65                         logging.error("Exception thrown - detail: ") 
    66                         logging.error(sys.exc_info()) 
    67                          
    68                         if dao: 
    69                                 logging.info("Removing record and its associated info from DB") 
    70                                 logging.info("- to allow clean ingestion on rerun") 
    71                                 try: 
    72                                         dao.deleteOriginalRecord() 
    73                                 except: 
    74                                         logging.error("Problem encountered when removing record: ") 
    75                                         logging.error(sys.exc_info()) 
    76                                         logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested") 
    77  
    78                         self._no_problem_files += 1 
    79                         logging.info("Continue processing other files") 
    80          
    81          
    82         def getConfigDetails(self, datacentre): 
    83                 ''' 
    84                 Get the harvested records directory and groups for this datacentre from the  
    85                 datacentre specific config file.  The harvested records directory depends on the  
    86                 datacentres OAI base url, the set and format. These have to be know up-front. 
    87                 The groups denote which 'portal groups' they belong to - for limiting searches to  
    88                 say NERC-only datacentres records. 
    89                 Groups are added to the intermediate MOLES when it is created. 
    90                 @param datacentre: datacentre to use when looking up config file  
    91                 ''' 
    92                 self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties" 
    93                 logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename) 
    94                  
    95                 # Check this file exists; if not, assume an invalid datacentre has been specified 
    96                 if not os.path.isfile(self._datacentre_config_filename): 
    97                     sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ 
    98                         "specified (%s) is invalid\n" %datacentre) 
    99                      
    100                 datacentre_config_file = open(self._datacentre_config_filename, "r") 
    101                  
    102                 for line in datacentre_config_file.readlines(): 
    103                     words  = string.split(line) 
    104                     if len(words) == 0: 
    105                         continue 
    106                     if words[0] == 'host_path': 
    107                         self._harvest_home = string.rstrip(words[1]) 
    108                     if words[0] == 'groups': 
    109                         self._datacentre_groups = words[1:] 
    110                     if words[0] == 'format': 
    111                         self._datacentre_format = words[1] 
    112                     if words[0] == 'namespace': 
    113                         self._datacentre_namespace = words[1] 
    114                     if words[0] == 'NDG_dataProvider': 
    115                         self._NDG_dataProvider = True 
    116                  
    117                 datacentre_config_file.close() 
    118                  
    119                 if self._harvest_home == "": 
    120                     sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename) 
    121                  
    122                 logging.info("harvested records are in " + self._harvest_home) 
    123                  
    124                 if self._datacentre_groups == "": 
    125                     logging.info("No groups/keywords set for datacentre " + datacentre) 
    126                 else: 
    127                     logging.info("datacentre groups/keywords: " + str(self._datacentre_groups)) 
    128                  
    129                 if self._datacentre_format == "": 
    130                     sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename) 
    131                  
    132                 logging.info("format being harvested: " + self._datacentre_format) 
    133                  
    134                 if self._datacentre_namespace == "": 
    135                     sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename) 
    136                  
    137                 logging.info("datacentre namespace: " + self._datacentre_namespace) 
    138                  
    139                 if self._NDG_dataProvider: 
    140                         logging.info("Datacentre classified as an NDG data provider") 
    141                 else: 
    142                         logging.info("Datacentre is not classified as an NDG data provider") 
    143                 logging.info(self.lineSeparator) 
    144  
    145                  
    146         def _getDBConnection(self): 
    147                 ''' 
    148                 Get the default DB connection - by reading in data from the db config file 
    149                 ''' 
    150                 logging.info("Setting up connection to postgres DB") 
    151                 dbinfo_file=open('ingest.config', "r") 
    152                 dbinfo = dbinfo_file.read().split() 
    153                 if len(dbinfo) < 4: 
    154                         raise ValueError, 'Incorrect data in config file' 
    155                  
    156                 # if port specified in file, use this, otherwise use default 
    157                 if len(dbinfo) > 4: 
    158                         self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4]) 
    159                 else: 
    160                         self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 
    161                 logging.info("Postgres DB connection now set up") 
    162  
    163          
    164         def usage(self): 
    165                 ''' 
    166                 Display input params for the script 
    167                 ''' 
    168                 print "Usage: python oai_document_ingester.py [OPTION] <datacentre>" 
    169                 print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:" 
    170                 print " -v - verbose mode for output logging" 
    171                 print " -d - debug mode for output logging" 
    172                 sys.exit(2) 
    173  
    174                  
    175         def __init__(self, datacentre=None): 
    176                 ''' 
    177                 Main entry point for script 
    178                 ''' 
    179                 self.lineSeparator = "-----------------------------" 
    180                 print self.lineSeparator 
    181                 print "RUNNING: oai_document_ingester.py" 
    182                  
    183                 # check for verbose option 
    184                 try: 
    185                     opts, args = getopt.getopt(sys.argv[1:], "vd") 
    186                 except getopt.GetoptError, err: 
    187                     # print help information and exit: 
    188                     print str(err) # will print something like "option -a not recognized" 
    189                      
    190                 loggingLevel = logging.WARNING 
    191                 for o, a in opts: 
    192                     if o == "-v": 
    193                         print " - Verbose mode ON" 
    194                         loggingLevel = logging.INFO 
    195                     elif o == "-d": 
    196                         print " - Debug mode ON" 
    197                         loggingLevel = logging.DEBUG 
    198                  
    199                 print self.lineSeparator 
    200                 logging.basicConfig(level=loggingLevel, 
    201                                             format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') 
    202                  
    203                 if datacentre is None: 
    204                         self.usage() 
    205                  
    206                 numfilesproc = 0 
    20722                self._no_files_ingested = 0 
    20823                self._no_problem_files = 0 
     24                self.dataCentre = dataCentre 
     25                 
    20926                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from 
    210                          
    211                 data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs 
     27                self._setupDataCentreDirs() 
    21228                 
    21329                #Change os directory to that with the harvested documents in it. 
     
    21733                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') 
    21834                 
    219                 # set the global variables to retrieve from the config file 
    220                 self._harvest_home = "" 
    221                 self._datacentre_groups = "" 
    222                 self._datacentre_format = "" 
    223                 self._datacentre_namespace = "" 
    224                 self._NDG_dataProvider = False 
    225                 self.getConfigDetails(datacentre) 
     35                self.getConfigDetails(dataCentre) 
    22636                 
    22737                # check harvest dir exists and that there are any records to harvest? 
    22838                if not os.path.exists(self._harvest_home): 
    229                         logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \ 
    230                                                  %(datacentre, self._harvest_home)) 
     39                        logging.info("Harvest directory for dataCentre %s (%s) could not be found - exiting" \ 
     40                                                 %(dataCentre, self._harvest_home)) 
    23141                        return 
    23242                elif len(os.listdir(self._harvest_home)) == 0: 
    233                         logging.info("Nothing to harvest this time from %s" %datacentre) 
     43                        logging.info("Nothing to harvest this time from %s" %dataCentre) 
    23444                        return 
    235                  
    236                 # The directory to put things for a tape backup (should already exist) 
    237                 backupdir = '/disks/glue1/oaiBackup/' 
    238                  
    239                 # the following dirs define where the specific documents should go 
    240                 originals_dir = data_dir + "/oai/originals/" 
    241                 discovery_dir = data_dir + "/discovery/" 
    242                  
    243                 # Create/clear the 'in' directory pristine copy of the discovery records 
    244                 FileUtilities.setUpDir(originals_dir) 
    245                 commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir 
     45 
     46                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir 
    24647                logging.info("Executing : " + commandline) 
    24748                status = os.system(commandline) 
     
    25051                    sys.exit("Failed at making pristine copy stage") 
    25152                 
    252                 # Create/clear the directory for the 'out' processed copy of the discovery records. 
    253                 FileUtilities.setUpDir(discovery_dir) 
    254                      
    255                 #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) 
    256                 # - also replace any namespace declarations with a standard one which we know works in NDG 
    257                 # NB, this copies files from the original dir to the discovery dir 
    258                 logging.info(self.lineSeparator) 
    259                 logging.info("Renaming files:") 
    260                 for filename in os.listdir(originals_dir): 
    261                         if filename.endswith('.xml'): 
    262                                 original_filename = originals_dir + filename 
    263                                 try: 
    264                                         ident=self.getID(original_filename) 
    265                                 except Exception, detail: 
    266                                         logging.error("Could not retrieve ID from file, %s" %filename) 
    267                                         logging.error("Detail: %s" %detail) 
    268                                         logging.info("Continue with next file") 
    269                                         continue 
    270                                  
    271                                 if self._NDG_dataProvider: 
    272                                         new_filename = discovery_dir + ident.replace(":","__")+".xml" 
    273                                 else: 
    274                                                 ident = ident.replace(":","-") 
    275                                                 ident = ident.replace("/","-") 
    276                                                 new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
    277                                                 logging.info("original file = " + original_filename) 
    278                                                 logging.info("newfile = " + new_filename) 
    279                                  
    280                                 # now correct any namespace issues 
    281                                 try: 
    282                                     SchemaNameSpace(original_filename, new_filename, self._datacentre_format) 
    283                                 except Exception, detail: 
    284                                         logging.error("SchemaNameSpace failed on file %s" %original_filename) 
    285                                         logging.error("Detail: %s" %detail) 
    286                                         logging.info("Continue with next file") 
    287                                         continue 
    288                                 numfilesproc += 1 
    289                         else: 
    290                                 logging.warning('File %s is not xml format. Not processed'  %(filename)) 
    291                  
    292                 logging.info(self.lineSeparator) 
    293                  
    294                 # now set up the required XQueries 
    295                 # - NB, extract the xquery libraries locally for easy reference 
    296                 self._xq=ndgXqueries() 
    297                 for libFile in self._xq.xqlib: 
    298                         FileUtilities.createFile(libFile, self._xq.xqlib[libFile]) 
    299                  
     53                self._setupXQueries() 
     54                                 
    30055                # Process the resulting files and put the data into the postgres DB 
    30156                # - firstly set up a db connection to use 
    302                 self._dbConnection = None 
    303                 self._getDBConnection() 
    304                  
    305                 filenames = os.listdir(discovery_dir) 
    306                 for filename in filenames: 
    307                         self.addFileToPostgresDB(discovery_dir + filename) 
    308                  
    309                 #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups 
    310                 backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") 
    311                  
    312                 this_backupdir = backupdir_base + "_originals/" 
    313                 FileUtilities.makeBackUp(originals_dir, this_backupdir) 
    314                  
    315                 #Clear out the original harvest records area and discovery dir 
    316                 FileUtilities.cleanDir(originals_dir) 
    317                 FileUtilities.cleanDir(discovery_dir) 
     57                self._getPostgresDBConnection() 
     58 
     59                self._convertAndIngestFiles(self.originals_dir, self.discovery_dir) 
    31860                 
    31961                logging.info("oai_document_ingest processing complete:") 
     
    33072                logging.info(self.lineSeparator) 
    33173                print "Script finished running." 
     74 
     75         
     76        def usage(self): 
     77                ''' 
     78                Display input params for the script 
     79                ''' 
     80                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>" 
     81                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:" 
     82                print " -v - verbose mode for output logging" 
     83                print " -d - debug mode for output logging" 
     84                sys.exit(2) 
    33285                 
    33386         
    33487if __name__=="__main__": 
    335         opts, args = getopt.getopt(sys.argv[1:], '-vd') 
    336         if len(args) < 1: 
    337                 oai_document_ingester() 
    338          
    339         oai_document_ingester(args[0]) 
     88        print "=================================" 
     89        print "RUNNING: oai_document_ingester.py" 
     90        ingester = oai_document_ingester() 
     91        args = ingester._setupCmdLineOptions() 
     92        ingester.processDataCentre(args[0]) 
Note: See TracChangeset for help on using the changeset viewer.