Ignore:
Timestamp:
01/05/08 14:15:41 (11 years ago)
Author:
cbyrom
Message:

Add script to run ingest for all avaiable config files.
Make oai_ingest_new2 a proper object.
Adjust db_funcs - now pass in details to set up database connection

  • although defaults available, if not done.

Simplify coord parsing in PostgresRecord? by using a reusable function.
+ various tidy ups and fixes.

Location:
TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresDAO.py

    r3814 r3839  
    1010class PostgresDAO: 
    1111     
    12     def __init__(self, record): 
     12    def __init__(self, record, connection=None): 
    1313        ''' 
    1414        Constructor - to initialise the DAL and do some initial setting up 
     
    1818            sys.exit("USAGE: argument 1 = PostgresRecord object to process") 
    1919        else: 
    20             print "INFO: Creating/updating DB entry for record, %s" %record.id 
    21  
    22         # setup a connection to the db 
    23         self._connection = db_connect() 
     20            print "INFO: Creating/updating DB entry for record, %s" %record.discovery_id 
     21 
     22        # setup a connection to the db - if none specified 
     23        if connection is None: 
     24            connection = db_funcs.db_connect() 
     25        self._connection = connection 
    2426        self._record = record 
    2527        self.id = None 
    2628         
     29 
     30    def commavaluesfromsublist(self, list): 
     31        ''' 
     32        Creates an comma-separated string of values from a list of sublists, taking  
     33        the first element of each sublist 
     34        ''' 
     35        csvString = "" 
     36        for item in list: 
     37            if item: 
     38                if len(csvString) > 0: 
     39                    csvString += ","     
     40                csvString += "%s" % item[0] 
     41             
     42        return csvString 
     43 
    2744             
    2845    def getRecordID(self): 
     
    3350        ''' 
    3451        logging.info("Looking up record, " + self._record.discovery_id + " in DB") 
    35         if self.id is not None & self.id > 0: 
     52        if self.id is not None and self.id > 0: 
    3653            logging.info("Already looked up record - ID is " + self.id) 
    3754            return self.id 
     
    4663        Looks up a record in the DB; if it finds it, update it, otherwise create it 
    4764        ''' 
    48         if getRecordID() > 0: 
    49             createRecord() 
    50         else: 
    51             updateRecord() 
     65        if self.getRecordID() > 0: 
     66            self.createRecord() 
     67        else: 
     68            self.updateRecord() 
    5269 
    5370             
     
    5976        # firstly store the actual documents contained by the record - i.e. the original doc + 
    6077        # the various transforms required 
    61         _insertOriginalRecord() 
    62         _insertMetadataRecords() 
     78        self._insertOriginalRecord() 
     79        self._insertMetadataRecords() 
    6380         
    6481        # Now add the spatiotemporal data 
    65         _insertSpatioTemporalData() 
     82        self._insertSpatioTemporalData() 
    6683        logging.info("New record created") 
    6784             
     
    7794            # firstly update the actual documents contained by the record - i.e. the original doc + 
    7895            # the various transforms required 
    79             _updateMetadataRecords(record) 
     96            self._updateMetadataRecords(record) 
    8097             
    8198            # Now update the spatiotemporal data 
    82             _updateSpatioTemporalData(record) 
     99            self._updateSpatioTemporalData(record) 
    83100         
    84101         
     
    106123 
    107124     
    108     def _insertSpatioTemporalData(): 
     125    def _insertSpatioTemporalData(self): 
    109126        ''' 
    110127        Create a record in the postgres DB based on the spatiotemporal data  
     
    114131        # NB, build the query up in two parts so that we only include the correct data 
    115132        sqlStart = "" 
     133        insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );" 
    116134        sqlEnd = " VALUES (DEFAULT, '" + self._record.id  
    117135        if (not self._record.hasNullCoords()): 
     
    143161             
    144162     
    145     def _updateSpatioTemporalData(): 
     163    def _updateSpatioTemporalData(self): 
    146164        ''' 
    147165        Updates a record in the postgres DB based on the spatiotemporal data  
     
    175193     
    176194             
    177     def _insertOriginalRecord(): 
     195    def _insertOriginalRecord(self): 
    178196        ''' 
    179197        Insert the original metadata doc into the postgres DB 
     
    191209         
    192210     
    193     def _updateOriginalRecord(): 
     211    def _updateOriginalRecord(self): 
    194212        ''' 
    195213        Update the original doc in the postgres DB 
     
    207225     
    208226             
    209     def _insertMetadataRecords(): 
     227    def _insertMetadataRecords(self): 
    210228        ''' 
    211229        Insert the metadata docs into the postgres DB 
     
    228246     
    229247     
    230     def _updateMetadataRecords(): 
     248    def _updateMetadataRecords(self): 
    231249        ''' 
    232250        Update the metadata docs into the postgres DB 
     
    253271        ''' 
    254272        self._record = record 
     273 
     274 
     275     
     276    def link_spatial_and_temporal(Mid,west,south,east,north,startdate,enddate): 
     277        ''' 
     278        Creates an entry in the item/time/location table linking an item, time and location. 
     279        ''' 
     280        locationid = insert_spatial_coverage(Mid,west,south,east,north) 
     281        timeid = insert_temporal_coverage(Mid,startdate,enddate) 
     282        if (locationid > -1): 
     283            locationidstr = "%s" % locationid 
     284        else: 
     285            locationidstr = "NULL" 
     286     
     287        if (timeid > -1): 
     288            timeidstr = "%s" % timeid 
     289        else: 
     290            timeidstr = "NULL" 
     291     
     292        itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid, timeid) values ('"+Mid+"', "+locationidstr+", "+timeidstr+" );" 
     293     
     294        print "ItemTimeLocation:\t"+itemtimelocation_sql 
     295        cursor = connection.cursor() 
     296        try: 
     297            cursor.execute(itemtimelocation_sql) 
     298        except: 
     299            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
     300        connection.commit() 
     301     
     302    def link_spatial(Mid,west,south,east,north): 
     303            '''Creates an entry in the item/time/location table linking an item and location only''' 
     304            locationid = insert_spatial_coverage(Mid,west,south,east,north) 
     305            if (locationid > -1): 
     306                    locationidstr = "%s" % locationid 
     307            else: 
     308                    locationidstr = "NULL" 
     309     
     310            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, locationid) values ('"+Mid+"', "+locationidstr+" );" 
     311     
     312            print "ItemTimeLocation:\t"+itemtimelocation_sql 
     313            cursor = connection.cursor() 
     314            try: 
     315                cursor.execute(itemtimelocation_sql) 
     316            except: 
     317                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
     318            connection.commit() 
     319     
     320    def link_temporal(Mid,startdate,enddate): 
     321            '''Creates an entry in the item/time/location table linking an item and time only''' 
     322            timeid = insert_temporal_coverage(Mid,startdate,enddate) 
     323            if (timeid > -1): 
     324                    timeidstr = "%s" % timeid 
     325            else: 
     326                    timeidstr = "NULL" 
     327     
     328            itemtimelocation_sql = "INSERT INTO "+item_table+" (itemid, timeid) values ('"+Mid+"', "+timeidstr+" );" 
     329     
     330            print "ItemTimeLocation:\t"+itemtimelocation_sql 
     331            cursor = connection.cursor() 
     332            try: 
     333                cursor.execute(itemtimelocation_sql) 
     334            except: 
     335                print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
     336            connection.commit() 
     337     
     338     
     339    def insert_spatial_coverage(Mid,west,south,east,north): 
     340        ''' 
     341        Attempts to insert spatial coverage into spatial table.  
     342        If successful, returns newly created location id, -1 otherwise 
     343        ''' 
     344        returnid = -1 
     345        if str(west)!='null' and str(south)!='null' and str(east)!='null' and str(north)!='null': 
     346            insert_sql="INSERT INTO "+location_table+" (geom) VALUES ( setsrid('BOX3D("+west+" "+south+","+east+" "+north+")'::box3d,4326) );" 
     347            print insert_sql 
     348        cursor = connection.cursor() 
     349        try: 
     350            cursor.execute(insert_sql) 
     351            if (cursor.rowcount > 0): 
     352                getid_sql="select currval('"+location_id_seq+"');" 
     353                cursor.execute(getid_sql) 
     354                id = commavaluesfromsublist(cursor.fetchall()) 
     355                return int(id) 
     356            else: 
     357                return -1 
     358        except: 
     359            print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
     360        connection.commit() 
     361     
     362    def insert_temporal_coverage(Mid,startdate,enddate): 
     363        '''Attempts to insert temporal coverage (only) into time table. If successful, returns timeid to be used in item table, -1 otherwise''' 
     364        returnid = -1 
     365        if (startdate=='nostartdate' and enddate=='noenddate'): 
     366            # Skip if both are missing, although it's OK if only 1 is given 
     367            return -1 
     368        else: 
     369            if (startdate=='nostartdate'): startdate='NULL' 
     370            if (enddate=='noenddate'): enddate='NULL' 
     371            insert_sql =  "INSERT INTO "+time_table+" (start_time, end_time) VALUES('"+startdate+"', '"+enddate+"');" 
     372            print insert_sql 
     373            cursor = connection.cursor() 
     374            try: 
     375                    cursor.execute(insert_sql) 
     376                    if (cursor.rowcount > 0): 
     377                            getid_sql="select currval('"+time_id_seq+"');" 
     378                            cursor.execute(getid_sql) 
     379                            id = commavaluesfromsublist(cursor.fetchall()) 
     380                            return int(id) 
     381                    else: 
     382                            return -1 
     383            except: 
     384                    print "Error: database error %s %s" %(sys.exc_type, sys.exc_value) 
     385            connection.commit() 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/PostgresRecord.py

    r3821 r3839  
    55''' 
    66try: #python 2.5 
    7     from xml.etree import ElementTree as ET 
     7    from xml.etree import cElementTree 
    88except ImportError: 
    99    try: 
    1010        # if you've installed it yourself it comes this way 
    11         import ElementTree as ET 
     11        import cElementTree 
    1212    except ImportError: 
    1313        # if you've egged it this is the way it comes 
    14         from elementtree import ElementTree as ET 
    15 #this is a fix to the  ElementTree namespace problem that namespaces are usually represented as ns0, ns1, ns2 etc. 
    16 #ET._namespace_map.update({'http://www.oceannet.org/mdip/xml': 'mdip', 'http://www.w3.org/1999/xlink':'xlink'}) 
     14        from ndgUtils.elementtree import cElementTree 
    1715 
    1816import os, sys, logging 
    19 from ETxmlView import loadET, nsdumb 
     17#from ETxmlView import loadET, nsdumb 
    2018import molesReadWrite as MRW 
    2119from ndgUtils.ndgObject import ndgObject 
     
    2826    @param  
    2927    ''' 
    30     documentTypes = ['MOLES', 'DIF', 'DC', 'MDIP', 'ISO19139'] 
     28    documentTypes = ['MOLES', 'DIF', 'DC', 'ISO19139']#, 'MDIP'] 
    3129         
    3230    def __init__(self, filename, ndg_dataprovider, datacentre_groups, datacentre_namespace, discovery_id, xq, docType): 
     
    6563        self.originalFormat = file(filename).read() 
    6664         
    67         # we use loadET to protect ourselves from scummy characters and unicode problems 
    68         # DO WE NEED TO DO THIS?? 
    69         self.correctedFormat = loadET(self.originalFormat) 
    70  
    71          
    7265        # initialise the various record fields 
    7366        self.db_id = None    # the DB ID of the record, for easy reference when it is created 
     
    146139        # Now do the transform 
    147140        os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.') 
    148         xqCommand = "java -cp saxon9.jar net.sf.saxon.Query " + xqFile 
     141        xqCommand = "java -cp ./lib/saxon9.jar net.sf.saxon.Query " + xqFile + " !omit-xml-declaration=yes" 
    149142        logging.debug("Running saxon command: " + xqCommand) 
    150143        pipe = os.popen(xqCommand + " 2>&1") 
     
    152145        status = pipe.close() 
    153146 
    154         print output 
    155147        if status is not None: 
    156148            sys.exit("Failed at running the XQuery") 
     
    162154         
    163155        logging.info("Transform completed successfully") 
     156         
     157#        f=open(xQueryType + "_doc.xml", 'w') 
     158#        f.write(output) 
     159#        f.close() 
    164160             
    165161        return output 
     
    256252         
    257253 
    258     def listify(item): 
     254    def listify(self, item): 
    259255        '''  
    260256        listify checks if an item is a list, if it isn't it puts it  
     
    273269        Extract spatio temporal data from the original document 
    274270        ''' 
    275         ET._namespace_map.update({'http://ndg.nerc.ac.uk/moles': 'moles', 'http://www.w3.org/1999/xlink':'xlink'}) 
     271        #this is a fix to the  ElementTree namespace problem that namespaces are usually  
     272        # represented as ns0, ns1, ns2 etc. 
     273        #cElementTree._namespace_map.update({'http://ndg.nerc.ac.uk/moles': 'moles', 'http://www.w3.org/1999/xlink':'xlink'}) 
    276274        no_bbox = False 
    277275        no_dates = False 
     
    283281        self.enddate='noenddate' 
    284282         
     283        molesFile = self._molesDir + self._shortFilename 
     284         
    285285        dgMeta=MRW.dgMetadata() 
    286286        try: 
    287             dgMeta.fromXML(cElementTree.ElementTree(file=self.filename).getroot()) 
    288         except: 
    289             logging.warning("WARNING: Cannot parse the XML moles document %s. Will not process" %self.filename) 
     287            dgMeta.fromXML(cElementTree.ElementTree(file=molesFile).getroot()) 
     288            print dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgSpatialCoverage.BoundingBox 
     289        except Exception, detail: 
     290            logging.warning("Cannot parse the XML moles document %s. Will not process" %molesFile) 
     291            logging.debug(detail) 
    290292            return 
     293         
    291294        try: 
    292             bbox_list=listify(dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgSpatialCoverage.BoundingBox) 
    293         except: 
    294             logging.info("XML moles document " + self.filename + \ 
    295                 " does not contain a bounding box.") 
     295            bbox_list=self.listify(dgMeta.dgMetadataRecord.dgDataEntity.dgDataSummary.dgDataCoverage.dgSpatialCoverage.BoundingBox) 
     296        except Exception, detail: 
     297            logging.info("XML moles document " + molesFile + " does not contain a bounding box.") 
     298            logging.debug(detail) 
    296299            no_bbox=True 
    297300 
     
    301304            print "enddate = %s" %dates.DateRangeEnd 
    302305        except: 
    303             logging.info("XML moles document " + self.filename + " does not contain temporal info.") 
     306            logging.info("XML moles document " + molesFile + " does not contain temporal info.") 
    304307            no_dates=True 
    305308 
    306309        if no_bbox and no_dates: 
    307             logging.info("XML moles document " + self.filename + " does not contain any spatiotemporal info.") 
     310            logging.info("XML moles document " + molesFile + " does not contain any spatiotemporal info.") 
    308311            return 
    309312 
     
    322325            bbox=bbox_list[0] 
    323326            try: 
    324                 west = bbox.LimitWest.strip() 
     327                self.west = self.parseCoord(bbox.LimitWest, 'W', 'E') 
    325328            except: 
    326                 print "ERROR:  Will not process File %s. Contains incorrect West bounding box limit." %self.filename 
     329                logging.error("Will not process File %s. Contains incorrect East bounding box limit." %molesFile) 
    327330                return 
    328             if west.endswith('E'): 
    329                 west=bbox.LimitWest.split('E')[0] 
    330             elif west.endswith('W'): 
    331                 if west.startswith('-'): 
    332                     west = bbox.LimitWest.split('W')[0] 
    333                 else: 
    334                     west = "-" +bbox.LimitWest.split('W')[0] 
     331             
    335332            try: 
    336                 float(west) 
     333                self.east = self.parseCoord(bbox.LimitEast, 'W', 'E') 
    337334            except: 
    338                 print "ERROR:  Will not process File %s. Contains incorrect West bounding box limit." %self.filename 
     335                logging.error("Will not process File %s. Contains incorrect East bounding box limit." %molesFile) 
    339336                return 
    340             self.west = west 
    341337             
    342338            try: 
    343                 east = bbox.LimitEast.strip() 
     339                self.north = self.parseCoord(bbox.LimitNorth, 'S', 'N') 
    344340            except: 
    345                 print "ERROR:  Will not process File %s. Contains incorrect East bounding box limit." %self.filename 
     341                logging.error("Will not process File %s. Contains incorrect North bounding box limit." %molesFile) 
    346342                return 
    347             if east.endswith('E'): 
    348                 east=bbox.LimitEast.split('E')[0] 
    349             elif east.endswith('W'): 
    350                 if east.startswith('-'): 
    351                     east = bbox.LimitEast.split('W')[0] 
    352                 else: 
    353                     east = "-" +bbox.LimitEast.split('W')[0] 
     343             
    354344            try: 
    355                 float(east) 
     345                self.south = self.parseCoord(bbox.LimitSouth, 'S', 'N') 
    356346            except: 
    357                 print "ERROR:  Will not process File %s. Contains incorrect East bounding box limit." %self.filename 
     347                logging.error("Will not process File %s. Contains incorrect South bounding box limit." %molesFile) 
    358348                return 
    359             self.east = east 
    360              
    361             try: 
    362                 north = bbox.LimitNorth.strip() 
    363             except: 
    364                 print "ERROR:  Will not process File %s. Contains incorrect North bounding box limit." %self.filename 
    365                 return 
    366             if north.endswith('N'): 
    367                 north=bbox.LimitNorth.split('N')[0] 
    368             elif north.endswith('S'): 
    369                 if north.startswith('-'): 
    370                     north = bbox.LimitNorth.split('S')[0] 
    371                 else: 
    372                     north = "-" +bbox.LimitNorth.split('S')[0] 
    373             try: 
    374                 float(north) 
    375             except: 
    376                 print "ERROR: Will not process File %s. Contains incorrect North bounding box limit." %self.filename 
    377                 return 
    378             self.north = north 
    379              
    380             try: 
    381                 south = bbox.LimitSouth.strip() 
    382             except: 
    383                 print "ERROR:  Will not process File %s. Contains incorrect South bounding box limit." %self.filename 
    384                 return 
    385             if south.endswith('N'): 
    386                 south=bbox.LimitSouth.split('N')[0] 
    387             elif south.endswith('S'): 
    388                 if south.startswith('-'): 
    389                     south = bbox.LimitSouth.split('S')[0] 
    390                 else: 
    391                     south = "-" +bbox.LimitSouth.split('S')[0] 
    392             try: 
    393                 float(south) 
    394             except: 
    395                 print "ERROR: Will not process File %s. Contains incorrect North bounding box limit." %self.filename 
    396                 return 
    397             self.south = south 
    398349 
    399350        logging.info("Spatial info: west= " + self.west + ",south " + self.south + ", east " + \ 
     
    401352        logging.info("Temporal info: startdate " + self.startdate + ", enddate " + self.enddate)  
    402353 
     354 
     355 
     356    def parseCoord(self, coordValue, minField, maxField): 
     357        ''' 
     358        Take a coordinate value extracted from a molefile bbox limit - together with  
     359        the appropriate max/min limits and extract the correct value from it 
     360        @param coordValue: the contents of the bbox limit tage 
     361        @param minField: the expected min field of the coord range - i.e. 'W' or 'S' 
     362        @param maxField: the expected max field of the coord range - i.e. 'E' or 'N' 
     363        @return: coord - the value of the coordinate as a string    
     364        ''' 
     365 
     366        coord = coordValue.strip() 
     367        if coord.endswith(maxField): 
     368            coord=coordValue.split(maxField)[0] 
     369        elif coord.endswith(minField): 
     370            if coord.startswith('-'): 
     371                coord = coordValue.split(minField)[0] 
     372            else: 
     373                coord = "-" + coordValue.split(minField)[0] 
     374 
     375        return '%s' % float(coord) 
    403376             
    404377    def hasNullCoords(): 
     378        ''' 
     379        Checks a record to determine whether it has any coordinates set to null 
     380        ''' 
    405381        if str(self.west)=='null' or \ 
    406382            str(self.south)=='null' or \ 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/db_funcs.py

    r3817 r3839  
    33# functions for use with NDG discovery postgres db 
    44 
    5 import pgdb, logging 
     5import pgdb, logging, sys 
    66 
    7 def db_connect(): 
     7def db_connect(databaseName='discoverydb', hostName='localhost', userName='postgres', password='pass01word'): 
    88        ''' 
    99        Open a Postgres database connection 
     10        @param databaseName: Name of DB to connect to 
     11        @param hostName: Name of machine where DB is located 
     12        @param userName: Name of user to connect to DB as 
     13        @param password: Password for user  
    1014        ''' 
    11         DATABASE = 'test' 
    12         HOST     = 'localhost' 
    13         USER     = 'postgres' 
    14         PW       = 'pass01word' 
    15         SCHEMA   = 'public' 
    16         #       DATABASE = 'xxxx' 
    17         #       HOST     = 'xxx.xxx.uk' 
    18         #       USER     = 'xxx' 
    19         #       PW       = 'xxxxxxx' 
    20         #       SCHEMA   = 'xxx' 
    21         logging.info("Setting up connection to DB: " + DATABASE + " on " + HOST) 
    22         connection_string = HOST + ':' + DATABASE + ':' + USER + ':' + PW 
     15        logging.info("Setting up connection to DB: " + databaseName + " on " + hostName) 
     16        connection_string = hostName + ':' + databaseName + ':' + userName + ':' + password 
    2317        connection = pgdb.connect(connection_string) 
    2418        logging.info("DB connection established") 
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_ingest_new2.py

    r3821 r3839  
    3737from PostgresRecord import PostgresRecord 
    3838from PostgresDAO import PostgresDAO 
    39  
    40 def getID(filename): 
    41         '''  
    42         Gets the identifier out of an input metadata xml record.  
    43         Copes with DIF and MDIP currently. 
    44         @param filename - name of document file being processed 
    45         @return: ID - id to use to refer to the document 
    46         ''' 
    47         logging.info("Retrieving identifier for metadata record " + filename) 
    48         xml=file(filename).read() 
    49         if datacentre_format == "DIF": 
    50             d=DIF(xml) 
    51             ID=d.entryID 
    52         elif datacentre_format == "MDIP": 
    53             d=MDIP(xml) 
    54             ID=d.id 
    55         else: 
    56             sys.exit("Only handles DIF or MDIP here.") 
    57  
    58         logging.info("Found identifier: " + ID) 
    59         return ID 
    60  
    61  
    62 def addFileToPostgresDB(filename): 
     39import db_funcs 
     40 
     41class oai_ingest: 
    6342        ''' 
    64         Add a file to the postgres DB - extracting and storing all the required 
    65         data in the process 
    66         @param filename: full path of file to add to postgres DB  
     43        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB 
     44        - including running the various transforms and parsings to get all doc types and spatiotemporal data 
     45        in the correct form in the DB 
    6746        ''' 
    68         logging.info("Adding file, " + filename + ", to postgres DB") 
    69         discoveryID = getID(filename) 
    70          
    71         # first of all create a PostgresRecord - this object represents all the data required 
    72         # for a DB entry 
    73         record = PostgresRecord(filename, NDG_dataProvider, datacentre_groups, datacentre_namespace, discoveryID, xq, datacentre_format) 
    74  
    75         # Now create the data access object to interface to the DB 
    76         dao = PostgresDAO(record) 
    77          
    78         # Finally, write the new record 
    79         dao.createOrUpdateRecord() 
    80  
    81  
    82 def getConfigDetails(datacentre): 
    83         ''' 
    84         Get the harvested records directory and groups for this datacentre from the  
    85         datacentre specific config file.  The harvested records directory depends on the  
    86         datacentres OAI base url, the set and format. These have to be know up-front. 
    87         The groups denote which 'portal groups' they belong to - for limiting searches to  
    88         say NERC-only datacentres records. 
    89         Groups are added to the intermediate MOLES when it is created. 
    90         @param datacentre: datacentre to use when looking up config file  
    91         ''' 
    92         # set the variables to use the global copies, not the local ones 
    93         global harvest_home, datacentre_groups, datacentre_format, datacentre_namespace 
    94         global datacentre_config_filename, NDG_dataProvider 
    95         datacentre_config_filename = base_dir + datacentre + "_config.properties" 
    96         logging.info("Retrieving data from datacentre config file, " + datacentre_config_filename) 
    97          
    98         # Check this file exists; if not, assume an invalid datacentre has been specified 
    99         if not os.path.isfile(datacentre_config_filename): 
    100             sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ 
    101                 "specified (%s) is invalid\n" %datacentre) 
    102              
    103         datacentre_config_file = open(datacentre_config_filename, "r") 
    104          
    105         for line in datacentre_config_file.readlines(): 
    106             words  = string.split(line) 
    107             if len(words) == 0: 
    108                 continue 
    109             if words[0] == 'host_path': 
    110                 harvest_home = string.rstrip(words[1]) 
    111             if words[0] == 'groups': 
    112                 datacentre_groups = words[1:] 
    113             if words[0] == 'format': 
    114                 datacentre_format = words[1] 
    115             if words[0] == 'namespace': 
    116                 datacentre_namespace = words[1] 
    117             if words[0] == 'NDG_dataProvider': 
    118                 NDG_dataProvider = True 
    119          
    120         datacentre_config_file.close() 
    121          
    122         if harvest_home == "": 
    123             sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename) 
    124          
    125         logging.info("harvested records are in " + harvest_home) 
    126          
    127         if datacentre_groups == "": 
    128             logging.info("No groups/keywords set for datacentre " + datacentre) 
    129         else: 
    130             logging.info("datacentre groups/keywords: " + datacentre_groups) 
    131          
    132         if datacentre_format == "": 
    133             sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename) 
    134          
    135         logging.info("format being harvested: " + datacentre_format) 
    136          
    137         if datacentre_namespace == "": 
    138             sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename) 
    139          
    140         logging.info("datacentre namespace: " + datacentre_namespace) 
    141          
    142         if NDG_dataProvider: 
    143                 logging.info("Datacentre classified as an NDG data provider") 
    144         else: 
    145                 logging.info("Datacentre is not classificied as an NDG data provider") 
    146         print lineSeparator 
    147          
    148  
    149 def usage(): 
    150         ''' 
    151         Display input params for the script 
    152         ''' 
    153         print "Usage: python -v oai_ingest.py <datacentre>" 
    154         print " - where:\n   <datacentre> is the data centre to ingest data from; and" 
    155         print " -v - verbose mode for output logging" 
    156         sys.exit(2) 
    157          
    158 lineSeparator = "-----------------------------" 
    159 print lineSeparator 
    160 print "RUNNING: oai_ingest.py"           
    161  
    162 # check for verbose option 
    163 try: 
    164     opts, args = getopt.getopt(sys.argv[1:], "v") 
    165 except getopt.GetoptError, err: 
    166     # print help information and exit: 
    167     print str(err) # will print something like "option -a not recognized" 
    168     usage() 
    169      
    170 loggingLevel = logging.WARNING 
    171 for o, a in opts: 
    172     if o == "-v": 
    173         print " - Verbose mode ON" 
    174         loggingLevel = logging.DEBUG 
    175  
    176 logging.basicConfig(level=loggingLevel, 
    177                                     format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') 
    178  
    179 print lineSeparator 
    180  
    181 if (len(args) != 1): 
    182         usage() 
    183 else: 
    184     datacentre = args[0] 
    185  
    186 # set up the file utils to use this logger 
    187 fileUtils = FileUtilities() 
    188  
    189 status = 0 
    190 numfilesproc = 0 
    191 #base_dir = "/usr/local/WSClients/OAIBatch/" # this is the base dir that the script is ran from 
    192 base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from 
    193          
    194 data_dir = base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs 
    195  
    196 #Change os directory to that with the harvested documents in it. 
    197 os.chdir(base_dir) 
    198  
    199 # - to run on Windows under cygwin, use the following 
    200 #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') 
    201  
    202 # set the global variables to retrieve from the config file 
    203 harvest_home = "" 
    204 datacentre_groups = "" 
    205 datacentre_format = "" 
    206 datacentre_namespace = "" 
    207 NDG_dataProvider = False 
    208 getConfigDetails(datacentre) 
    209  
    210 #any records to harvest? 
    211 if len( os.listdir(harvest_home)) == 0: 
    212     logging.info("Nothing to harvest this time from " + datacentre) 
    213     sys.exit() 
    214  
    215 # The directory to put things for a tape backup (should already exist) 
    216 backupdir = '/disks/glue1/oaiBackup/' 
    217  
    218 # the following dirs define where the specific documents should go 
    219 originals_dir = data_dir +"/oai/originals/" 
    220 discovery_dir = data_dir +"/discovery/" 
    221  
    222 # Create/clear the 'in' directory pristine copy of the discovery records 
    223 fileUtils.setUpDir(originals_dir) 
    224 commandline = "ls -1 " + harvest_home + "/ | xargs -i cp " + harvest_home + "/{\} " + originals_dir 
    225 #commandline = "find " + harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir 
    226 logging.info("Executing : " + commandline) 
    227 status = os.system(commandline) 
    228  
    229 #if status !=0: 
    230 #    sys.exit("Failed at making pristine copy stage") 
    231  
    232 # Create/clear the directory for the 'out' processed copy of the discovery records. 
    233 fileUtils.setUpDir(discovery_dir) 
    234      
    235 # The file config.properties contains the location of the particular datacentres harvested records. 
    236 # Copy the datacentre specific version of config to config.properties file. 
    237 #commandline = "cp " + datacentre_config_filename + " " + base_dir + "config.properties" 
    238 #logging.info("Executing : " + commandline) 
    239 #status = os.system(commandline) 
    240 #if status !=0: 
    241 #    sys.exit("Failed at copying config file stage") 
    242  
    243 #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) 
    244 # - also replace any namespace declarations with a standard one which we know works in NDG 
    245 # NB, this copies files from the original dir to the discovery dir 
    246 logging.info(lineSeparator) 
    247 logging.info("Renaming files:") 
    248 for filename in os.listdir(originals_dir): 
    249         if filename.find('.xml') != -1: 
    250                 original_filename = originals_dir + filename 
    251                 ident=getID(original_filename) 
    252                  
    253                 if NDG_dataProvider: 
    254                         new_filename = discovery_dir + ident.replace(":","__")+".xml" 
    255                 else: 
    256                                 ident = ident.replace(":","-") 
    257                                 ident = ident.replace("/","-") 
    258                                 new_filename = discovery_dir + "/" +datacentre_namespace+ "__"+datacentre_format+ "__"+ ident +".xml" 
    259                                 logging.info("original file = " + original_filename) 
    260                                 logging.info("newfile = " + new_filename) 
    261                  
    262                 # now correct any namespace issues 
    263                 try: 
    264                     SchemaNameSpace(original_filename, new_filename, datacentre_format) 
    265                 except: 
    266                         sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename) 
    267                 numfilesproc += 1 
    268         else: 
    269                 logging.warning('File %s is not xml format. Not processed'  %(filename)) 
    270  
    271 logging.info(lineSeparator) 
    272  
    273 # now set up the required XQueries 
    274 # - NB, extract the xquery libraries locally for easy reference 
    275 xq=ndgXqueries() 
    276 for libFile in xq.xqlib: 
    277         fileUtils.createFile(libFile, xq.xqlib[libFile]) 
    278  
    279 # Process the resulting files and put the data into the postgres DB 
    280 filenames = os.listdir(discovery_dir) 
    281 for filename in filenames: 
    282         addFileToPostgresDB(discovery_dir + filename) 
    283  
    284 #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups 
    285 backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") 
    286  
    287 this_backupdir = backupdir_base + "_originals" 
    288 fileUtils.makeBackUp(originals_dir, this_backupdir) 
    289  
    290 #Clear out the original harvest records area and FINALMOLES 
    291 fileUtils.cleanDir(originals_dir) 
    292 fileUtils.cleanDir(discovery_dir) 
    293 fileUtils.cleanDir(harvest_home) 
    294  
    295 print lineSeparator 
    296 print "INFO: No. of files pre-processed = %s" %numfilesproc 
    297 if status == 0: 
    298     print "INFO: Procedure oai_ingest.py completed" 
    299 else: 
    300     print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status 
    301 print lineSeparator 
     47 
     48        def getID(self, filename): 
     49                '''  
     50                Gets the identifier out of an input metadata xml record.  
     51                Copes with DIF and MDIP currently. 
     52                @param filename - name of document file being processed 
     53                @return: ID - id to use to refer to the document 
     54                ''' 
     55                logging.info("Retrieving identifier for metadata record " + filename) 
     56                xml=file(filename).read() 
     57                if self._datacentre_format == "DIF": 
     58                    d=DIF(xml) 
     59                    ID=d.entryID 
     60                elif self._datacentre_format == "MDIP": 
     61                    d=MDIP(xml) 
     62                    ID=d.id 
     63                else: 
     64                    sys.exit("Only handles DIF or MDIP here.") 
     65         
     66                logging.info("Found identifier: " + ID) 
     67                return ID 
     68         
     69         
     70        def addFileToPostgresDB(self, filename): 
     71                ''' 
     72                Add a file to the postgres DB - extracting and storing all the required 
     73                data in the process 
     74                @param filename: full path of file to add to postgres DB  
     75                ''' 
     76                if not os.path.isfile(filename): 
     77                        logging.info("Skipping, %s - not a valid file" %filename) 
     78                        return 
     79                 
     80                logging.info("Adding file, " + filename + ", to postgres DB") 
     81                discoveryID = self.getID(filename) 
     82                 
     83                # first of all create a PostgresRecord - this object represents all the data required 
     84                # for a DB entry 
     85                record = PostgresRecord(filename, self._NDG_dataProvider, \ 
     86                                                            self._datacentre_groups, self._datacentre_namespace, \ 
     87                                                            discoveryID, self._xq, self._datacentre_format) 
     88         
     89                # Now create the data access object to interface to the DB 
     90                dao = PostgresDAO(record, self._dbConnection) 
     91                 
     92                # Finally, write the new record 
     93                dao.createOrUpdateRecord() 
     94         
     95         
     96        def getConfigDetails(self, datacentre): 
     97                ''' 
     98                Get the harvested records directory and groups for this datacentre from the  
     99                datacentre specific config file.  The harvested records directory depends on the  
     100                datacentres OAI base url, the set and format. These have to be know up-front. 
     101                The groups denote which 'portal groups' they belong to - for limiting searches to  
     102                say NERC-only datacentres records. 
     103                Groups are added to the intermediate MOLES when it is created. 
     104                @param datacentre: datacentre to use when looking up config file  
     105                ''' 
     106                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties" 
     107                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename) 
     108                 
     109                # Check this file exists; if not, assume an invalid datacentre has been specified 
     110                if not os.path.isfile(self._datacentre_config_filename): 
     111                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ 
     112                        "specified (%s) is invalid\n" %datacentre) 
     113                     
     114                datacentre_config_file = open(self._datacentre_config_filename, "r") 
     115                 
     116                for line in datacentre_config_file.readlines(): 
     117                    words  = string.split(line) 
     118                    if len(words) == 0: 
     119                        continue 
     120                    if words[0] == 'host_path': 
     121                        self._harvest_home = string.rstrip(words[1]) 
     122                    if words[0] == 'groups': 
     123                        self._datacentre_groups = words[1:] 
     124                    if words[0] == 'format': 
     125                        self._datacentre_format = words[1] 
     126                    if words[0] == 'namespace': 
     127                        self._datacentre_namespace = words[1] 
     128                    if words[0] == 'self._NDG_dataProvider': 
     129                        self._NDG_dataProvider = True 
     130                 
     131                datacentre_config_file.close() 
     132                 
     133                if self._harvest_home == "": 
     134                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename) 
     135                 
     136                logging.info("harvested records are in " + self._harvest_home) 
     137                 
     138                if self._datacentre_groups == "": 
     139                    logging.info("No groups/keywords set for datacentre " + datacentre) 
     140                else: 
     141                    logging.info("datacentre groups/keywords: " + self._datacentre_groups) 
     142                 
     143                if self._datacentre_format == "": 
     144                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename) 
     145                 
     146                logging.info("format being harvested: " + self._datacentre_format) 
     147                 
     148                if self._datacentre_namespace == "": 
     149                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename) 
     150                 
     151                logging.info("datacentre namespace: " + self._datacentre_namespace) 
     152                 
     153                if self._NDG_dataProvider: 
     154                        logging.info("Datacentre classified as an NDG data provider") 
     155                else: 
     156                        logging.info("Datacentre is not classificied as an NDG data provider") 
     157                print self.lineSeparator 
     158 
     159                 
     160        def _getDBConnection(self): 
     161                ''' 
     162                Get the default DB connection - by reading in data from the db config file 
     163                ''' 
     164                logging.info("Setting up connection to postgres DB") 
     165                dbinfo_file=open('ingest.config', "r") 
     166                dbinfo = dbinfo_file.read().split() 
     167                if len(dbinfo) != 4: 
     168                        raise ValueError, 'Incorrect data in config file' 
     169                 
     170                self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) 
     171                logging.info("Postgres DB connection now set up") 
     172 
     173         
     174        def usage(self): 
     175                ''' 
     176                Display input params for the script 
     177                ''' 
     178                print "Usage: python -v oai_ingest.py <datacentre>" 
     179                print " - where:\n   <datacentre> is the data centre to ingest data from; and" 
     180                print " -v - verbose mode for output logging" 
     181                sys.exit(2) 
     182 
     183                 
     184        def __init__(self, datacentre=None): 
     185                ''' 
     186                Main entry point for script 
     187                ''' 
     188                self.lineSeparator = "-----------------------------" 
     189                print self.lineSeparator 
     190                print "RUNNING: oai_ingest.py"           
     191                 
     192                # check for verbose option 
     193                try: 
     194                    opts, args = getopt.getopt(sys.argv[1:], "v") 
     195                except getopt.GetoptError, err: 
     196                    # print help information and exit: 
     197                    print str(err) # will print something like "option -a not recognized" 
     198                    self.usage() 
     199                     
     200                loggingLevel = logging.WARNING 
     201                for o, a in opts: 
     202                    if o == "-v": 
     203                        print " - Verbose mode ON" 
     204                        loggingLevel = logging.DEBUG 
     205                 
     206                logging.basicConfig(level=loggingLevel, 
     207                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') 
     208                 
     209                print self.lineSeparator 
     210                 
     211                if datacentre is None: 
     212                        self.usage() 
     213                 
     214                # create file utils object to do various file related stuff 
     215                fileUtils = FileUtilities() 
     216                 
     217                status = 0 
     218                numfilesproc = 0 
     219                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from 
     220                         
     221                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs 
     222                 
     223                #Change os directory to that with the harvested documents in it. 
     224                os.chdir(self._base_dir) 
     225                 
     226                # - to run on Windows under cygwin, use the following 
     227                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') 
     228                 
     229                # set the global variables to retrieve from the config file 
     230                self._harvest_home = "" 
     231                self._datacentre_groups = "" 
     232                self._datacentre_format = "" 
     233                self._datacentre_namespace = "" 
     234                self._NDG_dataProvider = False 
     235                self.getConfigDetails(datacentre) 
     236                 
     237                #any records to harvest? 
     238                if len( os.listdir(self._harvest_home)) == 0: 
     239                    logging.info("Nothing to harvest this time from " + datacentre) 
     240                    sys.exit() 
     241                 
     242                # The directory to put things for a tape backup (should already exist) 
     243                backupdir = '/disks/glue1/oaiBackup/' 
     244                 
     245                # the following dirs define where the specific documents should go 
     246                originals_dir = data_dir +"/oai/originals/" 
     247                discovery_dir = data_dir +"/discovery/" 
     248                 
     249                # Create/clear the 'in' directory pristine copy of the discovery records 
     250                fileUtils.setUpDir(originals_dir) 
     251                commandline = "ls -1 " + self._harvest_home + "/ | xargs -i cp " + self._harvest_home + "/{\} " + originals_dir 
     252                #commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir 
     253                logging.info("Executing : " + commandline) 
     254                status = os.system(commandline) 
     255                 
     256                if status !=0: 
     257                    sys.exit("Failed at making pristine copy stage") 
     258                 
     259                # Create/clear the directory for the 'out' processed copy of the discovery records. 
     260                fileUtils.setUpDir(discovery_dir) 
     261                     
     262                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) 
     263                # - also replace any namespace declarations with a standard one which we know works in NDG 
     264                # NB, this copies files from the original dir to the discovery dir 
     265                logging.info(self.lineSeparator) 
     266                logging.info("Renaming files:") 
     267                for filename in os.listdir(originals_dir): 
     268                        if filename.endswith('.xml'): 
     269                                original_filename = originals_dir + filename 
     270                                ident=self.getID(original_filename) 
     271                                 
     272                                if self._NDG_dataProvider: 
     273                                        new_filename = discovery_dir + ident.replace(":","__")+".xml" 
     274                                else: 
     275                                                ident = ident.replace(":","-") 
     276                                                ident = ident.replace("/","-") 
     277                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
     278                                                logging.info("original file = " + original_filename) 
     279                                                logging.info("newfile = " + new_filename) 
     280                                 
     281                                # now correct any namespace issues 
     282                                try: 
     283                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format) 
     284                                except: 
     285                                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename) 
     286                                numfilesproc += 1 
     287                        else: 
     288                                logging.warning('File %s is not xml format. Not processed'  %(filename)) 
     289                 
     290                logging.info(self.lineSeparator) 
     291                 
     292                # now set up the required XQueries 
     293                # - NB, extract the xquery libraries locally for easy reference 
     294                self._xq=ndgXqueries() 
     295                for libFile in self._xq.xqlib: 
     296                        fileUtils.createFile(libFile, self._xq.xqlib[libFile]) 
     297                 
     298                # Process the resulting files and put the data into the postgres DB 
     299                # - firstly set up a db connection to use 
     300                self._dbConnection = None 
     301                self._getDBConnection() 
     302                 
     303                filenames = os.listdir(discovery_dir) 
     304                for filename in filenames: 
     305                        self.addFileToPostgresDB(discovery_dir + filename) 
     306                 
     307                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups 
     308                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") 
     309                 
     310                this_backupdir = backupdir_base + "_originals" 
     311                fileUtils.makeBackUp(originals_dir, this_backupdir) 
     312                 
     313                #Clear out the original harvest records area and FINALMOLES 
     314                fileUtils.cleanDir(originals_dir) 
     315                fileUtils.cleanDir(discovery_dir) 
     316                fileUtils.cleanDir(self._harvest_home) 
     317                 
     318                print self.lineSeparator 
     319                print "INFO: No. of files pre-processed = %s" %numfilesproc 
     320                if status == 0: 
     321                    print "INFO: Procedure oai_ingest.py completed" 
     322                else: 
     323                    print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status 
     324                print self.lineSeparator 
     325         
     326if __name__=="__main__": 
     327        opts, args = getopt.getopt(sys.argv[1:], "v") 
     328        if len(args) < 1: 
     329                oai_ingest() 
     330         
     331        oai_ingest(args[0]) 
Note: See TracChangeset for help on using the changeset viewer.