source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7869

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7869
Revision 7869, 12.5 KB checked in by sdonegan, 10 years ago (diff)

Updates rolled over from MEDIN bugfix

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13from time import strftime
14import ndg.common.src.lib.fileutilities as FileUtilities
15from abstractdocumentingester import AbstractDocumentIngester
16
17class Metadata_document_ingester(AbstractDocumentIngester):
18        '''
19        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
20        - including running the various transforms and parsings to get all doc types and spatiotemporal
21        data in the correct form in the DB
22        @return outMessage: string summary of ingest outcome
23        '''
24       
25        indFileToIngest=""     
26        ingestProcessID = None
27        overrideMetadataFolder = None
28        uniqueProviderID = None
29        procID = None
30
31        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
32                '''
33                indFileToIngest=None
34                Ingest documents from the specified data centre
35                @param dataCentre: data centre to ingest docs from
36                @keyword harvestDir: directory to get docs from - NB, this will override that
37                specified in the associated config file.  Typically this is used when a manual
38                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
39                @param dataFormat: format of data to ingest.  Overrides config file settings.
40                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
41                outMessage = summary of ingest process
42                '''
43               
44                self._no_files_ingested = 0
45                self._no_files_changed = 0
46                self._no_files_deleted = 0
47                self._no_problem_files = 0
48               
49                self.procID = procID
50               
51                self._error_messages = ''
52                self.dataCentre = dataCentre
53                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
54
55                #extract relevant directories etc from processing config gile
56                               
57                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
58                if configFileName is None:
59                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
60                else:
61                        self.processingDict = self.getProcessingConfig(configFileName)
62               
63               
64                self._code_dir = self.processingDict['code_directory']
65                self._base_dir = self.processingDict['base_directory']         
66                self._databaseConfigurationFile = self.processingDict['ingestConfig']
67                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
68                redirectUrls = self.processingDict['changeURLs']
69                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
70                self._saxonJarFile = self.processingDict['saxonJarFile']               
71                self._isoClass = self.processingDict['xpathIsoClass']
72                self._xqueryConversions = self.processingDict['xqueryConversions']
73                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
74                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
75                self._currentMedinStandard = self.processingDict['currentMEDIN']
76               
77                #set boolean depending on whether we want to change urls for ndg redirection service
78                if redirectUrls == 'False':
79                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
80                        self.changeUrls = False
81                else:
82                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
83                        self.changeUrls = True
84                       
85                               
86                self.processThread = 'OAI'
87               
88                dpwsProvID = None
89               
90                #check if this is a dpws ingest
91                if dataCentre == 'dpws':
92                       
93                        dpwsProvName = None
94                       
95                        self._getPostgresDBConnectionLogging()
96                                       
97                        #get actual datacentre name for processID from dpws db
98                       
99                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID             
100                        try:   
101                                dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
102                        except:
103                                logging.error("Could not extract provider ID from DPWS db!!")
104                                sys.exit(2)
105                                                       
106                        #get provider id
107                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
108                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
109                       
110                        if dpwsProvName is None:
111                                logging.error("Could not extract provider name from DPWS db!!")
112                                sys.exit(2)
113                        else:
114                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
115                                self.dataCentre = dpwsProvName
116                               
117                                #aaggh
118                                dataCentre = self.dataCentre                           
119                               
120                        #get provider name for provider id
121               
122                self._setupDataCentreDirs()
123               
124                #Change os directory to that with the harvested documents in it.
125                os.chdir(self._base_dir)
126                                               
127                # - to run on Windows under cygwin, use the following
128                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
129               
130                if dpwsProvID:
131                        self.getConfigDetails(dataCentre,dpwsID = dpwsProvID)
132                else:
133                        self.getConfigDetails(dataCentre,dpwsID = None)
134               
135               
136                # override default settings with input keyword values, if set
137                if harvestDir:
138                        self._harvest_home = harvestDir
139                if dataFormat:
140                        self._datacentre_format = dataFormat
141                       
142                #if a process id was used when initialsing this method, use thos as the process id
143                if self.procID is not None:
144                        logging.info("Using process ID %s set by run_all_ingest.py" %self.procID)
145                        self.ingestProcessID = self.procID
146
147               
148                logging.info("Setting up logging connection database")
149               
150               
151                                                               
152                if self.ingestProcessID is not None:
153                        self._getPostgresDBConnectionLogging()
154               
155                        #if process id has been supplied, then update the logging db with a "start_ingest"
156                        if self.procID is None:
157                               
158                                tempDPWSmetadataFolder = None                                   
159                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
160                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
161                       
162                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
163                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
164                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
165                               
166                                if tempDPWSmetadataFolder is not None:
167                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
168                                        self._harvest_home = tempDPWSmetadataFolder
169                                else:
170                                        #if this still set to none then there's an error!
171                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
172                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
173                                        sys.exit(2)
174
175               
176               
177                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
178                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
179                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
180               
181               
182                # check harvest dir exists and that there are any records to harvest?           
183                if self.indFileToIngest == "":
184                        if not os.path.exists(self._harvest_home):
185                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
186                                                 %(dataCentre, self._harvest_home))
187                                return
188                        elif len(os.listdir(self._harvest_home)) == 0:
189                                logging.info("Nothing to harvest this time from %s" %dataCentre)
190                                return
191                       
192                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
193                        logging.info("Executing : " + commandline)
194                        status = os.system(commandline)
195               
196                else:
197                        #must be looking for an individual file to upload
198                        if not os.path.exists(self.indFileToIngest):
199                               
200                                logging .warn("Specified file does not exist")
201                                return
202                       
203                        # Create/clear the 'in' directory pristine copy of the discovery records
204                        #fileUtils.setUpDir(originals_dir)
205                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
206                        logging.info("Executing : " + commandline)
207                        status = os.system(commandline)
208
209               
210                if status !=0:
211                    sys.exit("Failed at making pristine copy stage")
212               
213                self._setupXQueries()
214               
215                # Process the resulting files and put the data into the postgres DB
216                # - firstly set up a db connection to use
217                self._getPostgresDBConnection()
218               
219                                                               
220               
221                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
222                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
223                               
224                if self._datacentre_format in acceptedFormats:
225                       
226                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
227
228                else:
229                        logging.error("Unaccepted input format!")
230                        sys.exit()
231               
232               
233                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
234               
235                outMessage = "OAI Document ingest processing complete:\n"
236                logging.info("oai_document_ingest processing complete:")
237                isSuccess = False
238                if self._no_problem_files == 0:
239                        logging.info("All files successfully processed - cleaning harvest directory")
240                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
241                        isSuccess = True
242                else:
243                        logging.error("Problems experienced with %s files" %self._no_problem_files)
244                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
245               
246                logging.info(self.lineSeparator)
247                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
248                logging.info(message)
249                outMessage += message
250                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
251                logging.info(message)
252                outMessage += message
253                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
254                logging.info(message)
255                outMessage += message
256                message = "Number of files created = %s\n" %self._no_files_ingested
257                logging.info(message)
258                outMessage += message
259                message = "Number of files updated = %s\n" %self._no_files_changed
260                logging.info(message)
261                outMessage += message
262                message = "Number of problem files = %s\n" %self._no_problem_files
263                logging.info(message)
264                outMessage += message
265                logging.info(self.lineSeparator)
266                #Changed message to include more detail (SJD) but also now add any errors
267                if self._error_messages:
268                        outMessage += 'Errors: %s' %self._error_messages
269                        processingReport += 'Errors: %s' %self._error_messages
270
271
272               
273
274
275                #import pdb
276                #pdb.set_trace()
277                print "Script finished running."
278                return isSuccess, processingReport
279
280        def setIndFileToIngest(self, indFileToIngest):
281                '''
282                Method to set individual file to ingest if "individualFile" is invoked
283                '''
284                self.indFileToIngest = indFileToIngest
285               
286       
287        def setIngestLogID(self, processID):
288                '''
289                Method to set individual file to ingest if "individualFile" is invoked
290                '''
291                self.ingestProcessID = processID
292               
293               
294        def setOaiConfigFile(self, configFilePath):
295                '''
296                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
297                '''
298                print "**********************************************************************"
299                if configFilePath:
300                        self.oaiEditorConfig = configFilePath
301                        logging.info("Using configuration file at: " + configFilePath)
302                else:                                           
303                        self.oaiEditorConfig = './oai_document_ingester.config'
304                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
305               
306
307       
308        def usage(self):
309                '''
310                Display input params for the script
311                '''
312                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
313                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
314                print " -v - verbose mode for output logging"
315                print " -d - debug mode for output logging"
316                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
317
318                sys.exit(2)
319               
320       
321if __name__=="__main__":
322
323        print "================================="
324        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
325       
326       
327        ingester = Metadata_document_ingester() 
328        args = ingester._setupCmdLineOptions()
329       
330        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
331        dataCentre = None
332        for arg in args:
333                if len(args) == 1 and 'ingestProcessID' in arg:
334                       
335                        #yep, this is an dpws call to the ingester
336                        dataCentre = 'dpws'
337       
338        if dataCentre is not None:                                     
339                ingester.processDataCentre(dataCentre='dpws')
340        else:
341                #standard "original ingest"     
342                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.