Changeset 5414 for TI01-discovery


Ignore:
Timestamp:
25/06/09 11:55:41 (10 years ago)
Author:
sdonegan
Message:

Contains merged code to allow reporting of ingest, use of ndg redirect, accurate deletion and updating of exsiting records

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py

    r5297 r5414  
    66 As well as doing the ingest, a backup directory is created to store the created moles files. 
    77''' 
    8 import os, sys, string, getopt, logging, pkg_resources 
     8import os, sys, string, getopt, logging, re, pkg_resources 
    99from time import strftime 
    1010from SchemaNameSpace import SchemaNameSpace 
     
    1515from PostgresDAO import PostgresDAO 
    1616from Utilities import idget 
     17from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger 
     18from DeleteRecord import DeleteRecord 
     19import datetime,time 
     20from ndg.common.src.models.ndgObject import ndgObject 
     21from ndg.common.src.lib.ndgresources import ndgResources 
    1722 
    1823class AbstractDocumentIngester(object): 
     
    2631        # The directory to put things for a tape backup (should already exist) 
    2732        BACKUP_DIR = '/disks/glue1/oaiBackup/' 
     33         
     34        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url=' 
    2835 
    2936        def _setupCmdLineOptions(self): 
     
    3239                @return args: any input arguments - excluding options 
    3340                ''' 
     41                 
    3442                # check for verbose option 
     43                 
    3544                try: 
    36                         opts, args = getopt.getopt(sys.argv[1:], "vd") 
     45                        opts, args = getopt.getopt(sys.argv[1:], "vdi") 
    3746                except getopt.GetoptError, err: 
    3847                    # print help information and exit: 
    3948                    print str(err) # will print something like "option -a not recognized" 
    4049                    sys.exit(2) 
    41  
     50                     
    4251                if len(args) < 1: 
    4352                        self.usage() 
     
    5160                        print " - Debug mode ON" 
    5261                        loggingLevel = logging.DEBUG 
     62                    
    5363                 
    5464                # set up any keywords on the object 
    5565                # NB, be careful to keep the instance variables the same name as the keywords! 
    5666                for arg in args: 
     67                         
    5768                        bits = arg.split('=') 
    5869                        if len(bits) == 2: 
     
    6172                                elif bits[0] == 'interval': 
    6273                                        self.setPollInterval(bits[1]) 
     74                                elif bits[0] == 'individualFile': 
     75                                        print " - Running in single file ingestion mode!" 
     76                                        self.setIndFileToIngest(bits[1]) 
    6377                                else: 
    6478                                        setattr(self, bits[0], bits[1]) 
     
    104118                logging.info("Adding file, " + filename + ", to postgres DB") 
    105119                 
     120                numSlash = len(filename.split('/')) 
     121                 
     122                shortFilename = filename.split('/')[numSlash - 1] 
     123                 
     124                self.recordAttemptToIngestDiscoveryID = ""                               
     125                 
    106126                # first of all create a PostgresRecord - this object represents all the data required 
    107127                # for a DB entry 
    108128                dao = None 
     129                 
    109130                try: 
    110                         discoveryID = self.getID(filename) 
    111                          
     131                        #discoveryID = self.getID(filename) 
     132                         
     133                        basicParameters=DatasetBasicParameters(filename,self._datacentre_format) 
     134                                                 
     135                        discoveryID = basicParameters.datasetID                  
     136                        datasetName = basicParameters.datasetName 
     137                        datacentreName = basicParameters.datacentreName 
     138                        datasetLastEditUpdateDate = basicParameters.metadataCreationDate 
     139                         
     140                        #record whats attempting to be ingested 
     141                        self.recordAttemptToIngestDiscoveryID = discoveryID 
     142                         
     143                                                                                                            
    112144                        record = PostgresRecord(filename, self._NDG_dataProvider, \ 
    113145                                                            self._datacentre_groups, self._datacentre_namespace, \ 
    114                                                             discoveryID, self._xq, self._datacentre_format) 
    115                          
    116                         print self._xq 
     146                                                            discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate, \ 
     147                                                            self._xq, self._datacentre_format) 
     148                         
     149                         
    117150                        # Now create the data access object to interface to the DB 
    118151                        dao = PostgresDAO(record, pgClient = self.pgc) 
    119                  
     152                         
    120153                        # Finally, write the new record 
    121                         if dao.createOrUpdateRecord(): 
     154                        # 0 if failed, 1 if update, 2 if create 
     155                        returnCode = dao.createOrUpdateRecord()  
     156                        if returnCode == 2: 
    122157                                self._no_files_ingested += 1 
     158                        elif returnCode == 1: 
     159                                self._no_files_changed += 1 
     160                        #if 0 nothing incremented 
     161                         
    123162                except: 
     163                         
     164                        #if error encountered, add to failure lisr 
     165                        logging.error("Could not update: " + filename)                   
     166                        originalRecordFilename = self.inputFileOrigFinal[shortFilename] 
     167                        self.updateFailList.append(originalRecordFilename) 
     168                         
    124169                        logging.error("Exception thrown - detail: ") 
    125170                        errors = sys.exc_info() 
     
    138183 
    139184                        self._no_problem_files += 1 
     185                         
    140186                        logging.info("Continue processing other files") 
    141          
     187                         
     188                return self.recordAttemptToIngestDiscoveryID 
     189                         
    142190         
    143191        def getConfigDetails(self, datacentre): 
     
    215263                ''' 
    216264                numfilesproc = 0 
     265                 
     266                self.inputFileOrigFinal = {} 
     267                 
    217268                logging.info(self.lineSeparator) 
    218269                logging.info("Renaming files:") 
     270                 
    219271                for filename in os.listdir(originals_dir): 
    220272                        if not filename.endswith('.xml'): 
     
    223275                         
    224276                        original_filename = originals_dir + filename 
     277                         
     278                         
     279                        #convert urls within original xml input file to NDG redirect URLS 
     280                         
     281                        ''' This xquery method superceded by new python class in Utilities.py that uses 
     282                        elementtree to do the same job but with better handling of character sets...''' 
     283                        ''' 
     284                        if self._datacentre_format == 'DIF':             
     285                                xQueryType = 'transform_DIF_URL' 
     286                                 
     287                        elif self._datacentre_format == 'MDIP': 
     288                                xQueryType = 'transform_MDIP_URL' 
     289                                 
     290                        redirect_url = 'http://ndgRedirect.nerc.ac.uk/' 
     291                         
     292                        #filename_rd = filename.replace(".xml","_redirect.xml") 
     293                        #original_filename_urlRedirect = originals_dir + filename_rd 
     294                         
     295                        ndgRedirectTransform(xQueryType,redirect_url,original_filename) 
     296                        ''' 
     297                         
     298                        #call new class in Utilities.py --will replace original file... 
     299                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL) 
     300                         
     301                 
     302                                                 
     303                        #Use new class to get basic parameters from input xml doc to pass around (supplants getID method) 
     304                        basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format) 
     305                                     
    225306                        try: 
    226                                 ident=self.getID(original_filename) 
     307                                #ident=self.getID(original_filename) 
     308                                ident=basicParameters.datasetID  
     309                                logging.info("Dataset ID is " + ident) 
     310                                         
    227311                        except Exception, detail: 
    228312                                logging.error("Could not retrieve ID from file, %s" %filename) 
     
    233317                        if self._NDG_dataProvider: 
    234318                                new_filename = discovery_dir + ident.replace(":", "__")+".xml" 
     319                                new_filename_short = ident.replace(":", "__")+".xml" 
     320                                 
    235321                        else: 
    236322                                ident = ident.replace(":", "-") 
    237323                                ident = ident.replace("/", "-") 
    238324                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
    239                                 logging.info("original file = " + original_filename) 
    240                                 logging.info("newfile = " + new_filename) 
     325                                new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
     326                                 
     327                                 
     328                        logging.info("original file = " + original_filename) 
     329                        logging.info("newfile = " + new_filename) 
     330                                 
     331                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename) 
     332                        #this links a derived filename in processing dir with original filename 
     333                        #get basic filename from the path+filename passed to this function to use as key                 
     334                        self.inputFileOrigFinal[new_filename_short]=filename 
    241335                         
    242336                        # now correct any namespace issues 
     
    259353                Get the default postgres DB connection - by reading in data from the db config file 
    260354                ''' 
     355                 
    261356                logging.debug("Setting up connection to postgres DB") 
    262357                self.pgc = pgc(configFile = 'ingest.config') 
    263         logging.info("Postgres DB connection now set up") 
     358                logging.info("Postgres DB connection now set up") 
     359 
    264360 
    265361 
     
    276372                logging.info("Data backed up - now clearing ingest directories") 
    277373                #Clear out the original harvest records area and discovery dir 
    278                 FileUtilities.cleanDir(self.originals_dir) 
     374                #FileUtilities.cleanDir(self.originals_dir) 
    279375                FileUtilities.cleanDir(self.discovery_dir) 
    280376                logging.info("Ingest directories cleared") 
     
    299395 
    300396 
    301         def _convertAndIngestFiles(self, originals_dir, discovery_dir): 
     397        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre): 
    302398                ''' 
    303399                Convert files from originals dir to discovery one, then 
     
    306402                @param discovery_dir: directory to use to process ingested docs 
    307403                ''' 
     404                 
    308405                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir) 
    309406                 
    310407                filenames = os.listdir(self.discovery_dir) 
     408                 
     409                #generate a list of files ALREADY in database so can compare with what has been ingested 
     410                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"             
     411                filePresentListArr = self.pgc.runSQLCommand(sqlCmd) 
     412                filesPresentList=[] 
     413                if filePresentListArr: 
     414                        for fileArr in filePresentListArr: 
     415                                filesPresentList.append(fileArr[0]) 
     416         
     417                #create list to to hold files ingest failed on. 
     418                self.updateFailList = [] 
     419                self.deletedFailList = [] 
     420                 
    311421                for filename in filenames: 
    312422                        fullPath = self.discovery_dir + filename 
     423                         
    313424                        if os.path.isfile(fullPath): 
    314                                 self.addFileToPostgresDB(fullPath) 
    315                  
     425                                thisIngestedID = self.addFileToPostgresDB(fullPath) 
     426                                 
     427                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)  
     428                                if (thisIngestedID != "") and (len(filesPresentList) != 0): 
     429                                        if thisIngestedID in filesPresentList:                                           
     430                                                filesPresentList.remove(thisIngestedID)                                  
     431                                                 
     432                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these 
     433                #will need to be removed. 
     434                for item in filesPresentList: 
     435                        logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS") 
     436                        DeleteRecord(item) 
     437                        self.deletedFailList.append(item) 
     438                        self._no_files_deleted += 1 
     439                         
     440                         
    316441                self._backupAndCleanData() 
     442                 
     443                #at this stage put the reporting code in (will work for oai or atom feed) 
     444                #create a summary file for each data centre ingest 
     445                data_dir = self._base_dir + "data/" 
     446                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"             
     447                recOpFile = open(recOpFileName,'w') 
     448                 
     449                logging.info("oai_document_ingest processing complete:") 
     450                 
     451                recOpFile.write("Ingest report for data centre: " + datacentre + "\n") 
     452                recOpFile.write("Ingest date: " + str(datetime.datetime.now()) + "\n") 
     453                recOpFile.write("Original metadata directory: " + self._harvest_home + "\n\n") 
     454                recOpFile.write("TOTAL PROCESSED " + str(numfilesproc) + "\n") 
     455                recOpFile.write("INGESTED (Created) " + str(self._no_files_changed)  + "\n") 
     456                recOpFile.write("INGESTED (Updated) " + str(self._no_files_ingested)  + "\n") 
     457                recOpFile.write("DELETED " + str(self._no_files_deleted)  + "\n") 
     458                recOpFile.write("PROBLEM FILES " + str(self._no_problem_files)  + "\n") 
     459                 
     460                for badFile in self.updateFailList: 
     461                        recOpFile.write("PROBLEM_FILE " + badFile + "\n") 
     462                 
    317463                return numfilesproc 
    318464 
Note: See TracChangeset for help on using the changeset viewer.