Ignore:
Timestamp:
21/01/10 16:58:54 (10 years ago)
Author:
sdonegan
Message:

completion of synchronising...

Location:
TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py

    r5976 r6367  
    1212import ndg.common.src.lib.fileutilities as FileUtilities 
    1313from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc 
    14 from PostgresRecord import PostgresRecord 
    15 from PostgresDAO import PostgresDAO 
    16 from Utilities import idget 
    17 from Utilities import DatasetBasicParameters,ndgRedirectTransform,redirectUrlChanger 
     14from Utilities import idget, IngestTracking 
     15from Utilities import DatasetBasicParameters_MEDIN_v01,DatasetBasicParameters_Original 
     16from Utilities import ndgRedirectTransform,redirectUrlChanger 
    1817from DeleteRecord import DeleteRecord 
    1918import datetime,time 
    2019from ndg.common.src.models.ndgObject import ndgObject 
    2120from ndg.common.src.lib.ndgresources import ndgResources 
     21from AbstractFormatIngester_original import AbstractFormatIngesterOriginal 
     22from ndg.common.src.models.ndgObject import ndgObject 
     23 
     24SAXON_JAR_FILE = '/disks/glue1/sdonegan/NDG3_workspace/buildouts/oai_document_ingester_MEDIN/ingestAutomation-upgrade/OAIBatch/lib/saxon9.jar' 
     25         
    2226 
    2327class AbstractDocumentIngester(object): 
     
    3135        # The directory to put things for a tape backup (should already exist) 
    3236        BACKUP_DIR = '/disks/glue1/oaiBackup/' 
    33          
     37                 
     38        #keep ndg3beta service for testing MEDIN upgrades 
    3439        NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url=' 
     40         
     41         
     42        #NDG_redirect_URL = 'http://triton.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url=' 
     43         
     44         
    3545 
    3646        def _setupCmdLineOptions(self): 
     
    118128         
    119129         
    120         def addFileToPostgresDB(self, filename): 
    121                 ''' 
    122                 Add a file to the postgres DB - extracting and storing all the required 
    123                 data in the process 
    124                 @param filename: full path of file to add to postgres DB 
    125                 ''' 
    126                 logging.info("Adding file, " + filename + ", to postgres DB") 
    127                  
    128                 numSlash = len(filename.split('/')) 
    129                  
    130                 shortFilename = filename.split('/')[numSlash - 1] 
    131                  
    132                 self.recordAttemptToIngestDiscoveryID = ""                               
    133                  
    134                 # first of all create a PostgresRecord - this object represents all the data required 
    135                 # for a DB entry 
    136                 dao = None 
    137                  
    138                 try: 
    139                         #discoveryID = self.getID(filename) 
    140                          
    141                         basicParameters=DatasetBasicParameters(filename,self._datacentre_format) 
    142                                                  
    143                         discoveryID = basicParameters.datasetID                  
    144                         datasetName = basicParameters.datasetName 
    145                         datacentreName = basicParameters.datacentreName 
    146                         datasetLastEditUpdateDate = basicParameters.metadataCreationDate 
    147                         datasetStartDateNom = basicParameters.datasetStartDateNom 
    148                         datasetEndDateNom = basicParameters.datasetEndDateNom 
    149                          
    150                         #record whats attempting to be ingested 
    151                         self.recordAttemptToIngestDiscoveryID = discoveryID 
    152                          
    153                                                                                                             
    154                         record = PostgresRecord(filename, self._NDG_dataProvider, \ 
    155                                                             self._datacentre_groups, self._datacentre_namespace, \ 
    156                                                             discoveryID,datasetName,datacentreName,datasetLastEditUpdateDate,datasetStartDateNom,datasetEndDateNom, \ 
    157                                                             self._xq, self._datacentre_format) 
    158                         #import pdb 
    159                         #pdb.set_trace() 
    160                          
    161                         # Now create the data access object to interface to the DB 
    162                         dao = PostgresDAO(record, pgClient = self.pgc) 
    163                          
    164                         # Finally, write the new record 
    165                         # 0 if failed, 1 if update, 2 if create 
    166                         returnCode = dao.createOrUpdateRecord()  
    167                         if returnCode == 2: 
    168                                 self._no_files_ingested += 1 
    169                         elif returnCode == 1: 
    170                                 self._no_files_changed += 1 
    171                         #if 0 nothing incremented 
    172                          
    173                 except: 
    174                          
    175                         #if error encountered, add to failure lisr 
    176                         logging.error("Could not update: " + filename)                   
    177                         originalRecordFilename = self.inputFileOrigFinal[shortFilename] 
    178                         self.updateFailList.append(originalRecordFilename) 
    179                          
    180                         logging.error("Exception thrown - detail: ") 
    181                         errors = sys.exc_info() 
    182                         logging.error(errors) 
    183                         self._error_messages += "%s\n" %str(errors[1]) 
    184                          
    185                         if dao: 
    186                                 logging.info("Removing record and its associated info from DB") 
    187                                 logging.info("- to allow clean ingestion on rerun") 
    188                                 try: 
    189                                         dao.deleteOriginalRecord() 
    190                                 except: 
    191                                         logging.error("Problem encountered when removing record: ") 
    192                                         logging.error(sys.exc_info()) 
    193                                         logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested") 
    194  
    195                         self._no_problem_files += 1 
    196                          
    197                         logging.info("Continue processing other files") 
    198                          
    199                 return self.recordAttemptToIngestDiscoveryID 
     130         
    200131                         
    201132         
     
    221152                 
    222153 
    223                 file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename) 
    224                  
    225                 for line in file.split('\n'): 
     154                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename) 
     155                file = open(datacentre_config_filename,"r") 
     156                 
     157                for line in file.readlines(): 
    226158                        words = line.split() 
    227159                        if len(words) == 0: 
     
    237169                        elif words[0] == 'NDG_dataProvider': 
    238170                                self._NDG_dataProvider = True 
    239                          
     171                 
    240172                if self._harvest_home == "": 
    241173                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename) 
     
    265197 
    266198 
    267         def _convertIngestFiles(self, originals_dir, discovery_dir): 
     199        def convertDIFtoISO(self,metadataFileLoc,repositoryName,metadataID,metadataFilename): 
     200                ''' 
     201                   New to MEDIN ingest Upgrade: 
     202                   This converts the still supported DIF to ISO format - all extractions will come from ISO,  
     203                   whether original ISO format or conversions to ISO.  
     204                    
     205                   Uses the new "dif2stubISO" xquery 
     206                ''' 
     207                xQueryType = 'dif2stubISO' 
     208                 
     209                self._repository_local_id = repositoryName #'neodc.nerc.ac.uk' 
     210                self.discovery_id = metadataID 
     211                self._local_id = metadataFilename 
     212                self._repository = repositoryName #'neodc.nerc.ac.uk' 
     213                                 
     214                self.xqueryLib = ndgResources()                                                  
     215                xquery = self.xqueryLib.createXQuery(xQueryType,metadataFileLoc, self._repository_local_id, self._local_id) 
     216       
     217                # sort out the input ID stuff 
     218                xquery=xquery.replace('Input_Entry_ID', self.discovery_id) 
     219                xquery=xquery.replace('repository_localid', self._repository) 
     220 
     221        # strip out the eXist reference to the libraries; these files should be available in the 
     222                # running dir - as set up by oai_ingest.py 
     223                xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Vocabs/', '') 
     224                xquery=xquery.replace('xmldb:exist:///db/xqueryLib/Utilities/', '') 
     225 
     226        # write the query to file, to make it easier to input 
     227        # NB, running directly at the command line leads to problems with the interpretation of $ characters 
     228                xqFile = "currentQuery" + xQueryType + ".xq"  
     229                 
     230                FileUtilities.createFile(xqFile, xquery) 
     231                                 
     232                # ensure the jar file is available - NB, this may be running from a different 
     233        # location - e.g. the OAIInfoEditor.lib.harvester - and this won't have the 
     234        # saxon file directly on its filesystem 
     235                 
     236                #jarFile = pkg_resources.resource_filename('OAIBatch', SAXON_JAR_FILE) 
     237                jarFile = SAXON_JAR_FILE 
     238        # Now do the transform 
     239                os.putenv ('PATH', ':/usr/java/jdk1.5.0_03/bin:/usr/java/jdk1.5.0_03:/usr/java/jdk1.5.0_03/lib/tools.jar:/usr/local/WSClients/OAIBatch:/usr/local/exist-client/bin:/bin:/usr/bin:.') 
     240                xqCommand = "java -cp %s net.sf.saxon.Query %s !omit-xml-declaration=yes" %(jarFile, xqFile) 
     241                                 
     242                logging.debug("Running saxon command: " + xqCommand) 
     243                pipe = os.popen(xqCommand + " 2>&1") 
     244                output = pipe.read() 
     245                status = pipe.close() 
     246 
     247                if status is not None: 
     248                        raise SystemError, 'Failed at running the XQuery' 
     249                 
     250                 
     251                # now remove the temp xquery file 
     252                '''status = os.unlink(xqFile) 
     253                if status is not None: 
     254                        raise OSError, 'Failed to remove the temporary xquery file, ' + xqFile''' 
     255 
     256                logging.info("Transform completed successfully") 
     257 
     258                return output 
     259                                 
     260 
     261 
     262        def _convertIngestFiles(self, originals_dir, discovery_dir, stubIso_dir, format): 
    268263                ''' 
    269264                Processes/renames the files (changed 08/01/07 to get id from inside file) 
     
    274269                 @return numfilesproc: counter of number of files processed 
    275270                ''' 
    276                 numfilesproc = 0 
     271                numfilesproc = 0                         
    277272                 
    278273                self.inputFileOrigFinal = {} 
     
    289284                        original_filename = originals_dir + filename 
    290285                         
    291                          
    292                          
    293                         #convert urls within original xml input file to NDG redirect URLS 
    294                          
    295                         ''' This xquery method superceded by new python class in Utilities.py that uses 
    296                         elementtree to do the same job but with better handling of character sets...''' 
    297                         ''' 
    298                         if self._datacentre_format == 'DIF':             
    299                                 xQueryType = 'transform_DIF_URL' 
    300                                  
    301                         elif self._datacentre_format == 'MDIP': 
    302                                 xQueryType = 'transform_MDIP_URL' 
    303                                  
    304                         redirect_url = 'http://ndgRedirect.nerc.ac.uk/' 
    305                          
    306                         #filename_rd = filename.replace(".xml","_redirect.xml") 
    307                         #original_filename_urlRedirect = originals_dir + filename_rd 
    308                          
    309                         ndgRedirectTransform(xQueryType,redirect_url,original_filename) 
    310                         ''' 
    311                          
     286                                                 
     287                        #convert urls within original xml input file to NDG redirect URLS                                                
    312288                        #call new class in Utilities.py --will replace original file... 
    313289                        redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self.NDG_redirect_URL) 
    314290                         
    315                  
    316                                                  
    317                         #Use new class to get basic parameters from input xml doc to pass around (supplants getID method) 
    318                         basicParameters=DatasetBasicParameters(original_filename,self._datacentre_format) 
    319                                      
    320                         try: 
    321                                 #ident=self.getID(original_filename) 
    322                                 ident=basicParameters.datasetID  
    323                                 logging.info("Dataset ID is " + ident) 
    324                                          
    325                         except Exception, detail: 
    326                                 logging.error("Could not retrieve ID from file, %s" %filename) 
    327                                 logging.error("Detail: %s" %detail) 
    328                                 logging.info("Continue with next file") 
    329                                 continue 
    330                          
    331                         if self._NDG_dataProvider: 
    332                                  
    333                                 #new_filename = discovery_dir + ident.replace(":", "__")+".xml" 
    334                                 #new_filename_short = ident.replace(":", "__")+".xml" 
    335                                  
    336                                 #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?) 
    337                                 new_filename = discovery_dir + self._datacentre_namespace + "__" +self._datacentre_format + "__"+ ident.replace(":", "-")+".xml" 
    338                                 new_filename_short = self._datacentre_namespace + "__" +self._datacentre_format + "__" + ident.replace(":", "-")+".xml" 
    339                                  
    340                         else: 
    341                                  
    342                                 ident = ident.replace(":", "-") 
    343                                 ident = ident.replace("/", "-") 
    344                                  
    345                                 new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
    346                                 new_filename_short = self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" 
    347                                  
     291                        if format == 'DIF': 
     292                                 
     293                                #NOTE MDIP now deprecated!!  easier to leave all those crappy dif or mdip code but will mainly be replaced anyway 
     294                                 
     295                                logging.info("Converting DIF to stub ISO...") 
     296                                                                 
     297                                #gets the metadata id from the xml 
     298                                metadataID=self.getID(original_filename) 
     299                                 
     300                                repositoryName = self._datacentre_namespace 
     301                                 
     302                                #get the name of the file to be used in the xquery 
     303                                metadataFilename = filename.replace('.xml', '') 
     304                                 
     305                                #where is the file to be ingested located? 
     306                                metadataFileLoc = originals_dir 
     307                                 
     308                                #generate a new stubISO filename 
     309                                isoFormat= "stubISO" 
     310                                 
     311                                if self._NDG_dataProvider: 
     312                                                                 
     313                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?) 
     314                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml" 
     315                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml" 
     316                                 
     317                                else: 
     318                                 
     319                                        ident = ident.replace(":", "-") 
     320                                        ident = ident.replace("/", "-") 
     321                                 
     322                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml" 
     323                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml" 
     324                                 
     325                                 
     326                                self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename) 
     327                                 
     328                                #now create this stub ISO file on system so can access it 
     329                                self.stubIsoFile = stubIso_dir + new_filename_short 
     330                                FileUtilities.createFile(self.stubIsoFile, self.isoXML) 
     331                        logging.info("Stub ISO file created - at %s" %stubIso_dir) 
     332                         
     333                        #elif format == 'MEDIN_v0.1': 
     334                 
     335                                #basicParameters=DatasetBasicParameters_MEDIN_v01(original_filename,self._datacentre_format) 
     336                              
    348337                                 
    349338                        logging.info("original file = " + original_filename) 
     
    367356                logging.info("File renaming and converting completed") 
    368357                logging.info(self.lineSeparator) 
    369                  
    370                 #sys.exit() 
    371                  
     358                         
    372359                return numfilesproc 
    373360 
     
    411398                self.originals_dir = data_dir + "/oai/originals/" 
    412399                self.discovery_dir = data_dir + "/discovery/" 
     400                self.stubISO_dir = data_dir + "/stub_iso/" 
    413401                 
    414402                # Create/clear the 'in' and 'out' directories 
    415403                FileUtilities.setUpDir(self.originals_dir) 
    416404                FileUtilities.setUpDir(self.discovery_dir) 
     405                FileUtilities.setUpDir(self.stubISO_dir) 
    417406                 
    418407                logging.info("Ingest directories for data centre set up") 
    419408 
    420409 
    421         def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed): 
     410        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format): 
    422411                ''' 
    423412                Convert files from originals dir to discovery one, then 
     
    427416                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other 
    428417                ''' 
    429                  
    430                 numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir) 
    431                  
     418                                 
     419                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, self.stubISO_dir, format) 
     420                                 
    432421                filenames = os.listdir(self.discovery_dir) 
    433422                 
     
    445434         
    446435                #create list to to hold files ingest failed on. 
    447                 self.updateFailList = [] 
    448                 self.deletedFailList = [] 
    449                  
     436                self.thisIngestMonitor = IngestTracking() 
     437                 
     438                #self.updateFailList = [] 
     439                #self.deletedFailList = [] 
     440                #no_problem_files = 0 
    450441                 
    451442                for filename in filenames: 
     
    453444                         
    454445                        if os.path.isfile(fullPath): 
    455                                 thisIngestedID = self.addFileToPostgresDB(fullPath) 
     446 
     447                                #ingest file intelligently according to input formatfilename,datacentre_format,datacentre_groups,datacentre_groups                               
     448                                if format == 'DIF' or format == 'MDIP': 
     449                                              
     450                                        #thisIngestedID = ingestOriginal.addFileToPostgresDB_original(fullPath) 
     451                                         
     452                                         
     453                                        thisIngest = AbstractFormatIngesterOriginal() 
     454                                        thisIngestedID = thisIngest.ingest(fullPath,format,self._datacentre_groups,self._datacentre_namespace,self._xq,self.pgc,self.inputFileOrigFinal,self.thisIngestMonitor, self._NDG_dataProvider,self.stubIsoFile) 
     455                                        sys.exit() 
     456                                         
     457                                                                                                         
     458                                elif format == 'MEDIN_v0.1': 
     459                                        thisIngestedID, errorMsg = AbstractFormatIngester.addFileToPostgresDB_Medin_v01(fullPath) 
    456460                                 
    457461                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)  
     
    468472                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                    
    469473                                DeleteRecord(item) 
    470                                 self.deletedFailList.append(item) 
    471                                 self._no_files_deleted += 1 
     474                                 
     475                                #self.deletedFailList.append(item) 
     476                                #self._no_files_deleted += 1 
     477                                self.thisIngestMonitor.appendDeletedList(item) 
     478                                self.thisIngestMonitor.incrementDeletedFile() 
    472479                         
    473480                         
     
    487494                message = message + "Original metadata directory: " + self._harvest_home + "\n\n" 
    488495                message = message + "TOTAL PROCESSED " + str(numfilesproc) + "\n" 
    489                 message = message + "INGESTED (Created) " + str(self._no_files_changed)  + "\n" 
    490                 message = message + "INGESTED (Updated) " + str(self._no_files_ingested)  + "\n" 
    491                 message = message + "DELETED " + str(self._no_files_deleted)  + "\n" 
    492                 message = message + "PROBLEM FILES " + str(self._no_problem_files)  + "\n" 
     496                message = message + "INGESTED (Created) " + str(self.thisIngestMonitor.getIngestedFileNum())  + "\n" 
     497                message = message + "INGESTED (Updated) " + str(self.thisIngestMonitor.getChangedFileNum())  + "\n" 
     498                message = message + "DELETED " + str(self.thisIngestMonitor.getDeletedFileNum())  + "\n" 
     499                message = message + "PROBLEM FILES " + str(self.thisIngestMonitor.getProblemFileNum())  + "\n" 
    493500                 
    494501                '''recOpFile.write("Ingest report for data centre: " + datacentre + "\n") 
     
    502509                 
    503510                 
    504                 for badFile in self.updateFailList: 
     511                #for badFile in self.updateFailList: 
     512                for badFile in self.thisIngestMonitor.getFailList(): 
    505513                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n") 
    506514                        message = message +"PROBLEM_FILE " + badFile + "\n" 
  • TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py

    r5600 r6367  
    5656 
    5757                self.getConfigDetails(dataCentre) 
     58                                         
    5859                # override default settings with input keyword values, if set 
    5960                if harvestDir: 
     
    9899                # - firstly set up a db connection to use 
    99100                self._getPostgresDBConnection() 
     101                 
     102                 
     103                #start of MEDIN upgrades! 19/11/09 SJD 
     104                 
     105                #define accepted data formats so can do "smart" ingest (i.e. many different i/p formats) 
     106                acceptedFormats = ['MDIP','DIF','MEDIN_v0.1'] # TODO have this in a config file: MEDIN_v0.1 is the NDG development version 
     107                 
     108                if self._datacentre_format in acceptedFormats: 
     109                         
     110                        numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True, self._datacentre_format) 
    100111 
    101                 numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True) 
     112                else: 
     113                        logging.error("Unaccepted input format!") 
     114                        sys.exit() 
    102115                 
     116                 
     117 
    103118                outMessage = "OAI Document ingest processing complete:\n" 
    104119                logging.info("oai_document_ingest processing complete:") 
     
    110125                else: 
    111126                        logging.error("Problems experienced with %s files" %self._no_problem_files) 
    112                         logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran") 
     127                        logging.error("- harvest ingest directory will not be cleared until these have been fixed and the script has been reran") 
    113128                 
    114129                logging.info(self.lineSeparator) 
Note: See TracChangeset for help on using the changeset viewer.