source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7362

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7362
Revision 7362, 12.4 KB checked in by sdonegan, 10 years ago (diff)

debug problem file reporting and adjusted data types in reporting

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13from time import strftime
14import ndg.common.src.lib.fileutilities as FileUtilities
15from abstractdocumentingester import AbstractDocumentIngester
16
17class Metadata_document_ingester(AbstractDocumentIngester):
18        '''
19        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
20        - including running the various transforms and parsings to get all doc types and spatiotemporal
21        data in the correct form in the DB
22        @return outMessage: string summary of ingest outcome
23        '''
24       
25        indFileToIngest=""     
26        ingestProcessID = None
27        overrideMetadataFolder = None
28        uniqueProviderID = None
29        procID = None
30
31        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
32                '''
33                indFileToIngest=None
34                Ingest documents from the specified data centre
35                @param dataCentre: data centre to ingest docs from
36                @keyword harvestDir: directory to get docs from - NB, this will override that
37                specified in the associated config file.  Typically this is used when a manual
38                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
39                @param dataFormat: format of data to ingest.  Overrides config file settings.
40                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
41                outMessage = summary of ingest process
42                '''
43               
44                self._no_files_ingested = 0
45                self._no_files_changed = 0
46                self._no_files_deleted = 0
47                self._no_problem_files = 0
48               
49                self.procID = procID
50               
51                self._error_messages = ''
52                self.dataCentre = dataCentre
53                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
54
55                #extract relevant directories etc from processing config gile
56               
57               
58                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
59                if configFileName is None:
60                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
61                else:
62                        self.processingDict = self.getProcessingConfig(configFileName)
63               
64               
65                self._code_dir = self.processingDict['code_directory']
66                self._base_dir = self.processingDict['base_directory']         
67                self._databaseConfigurationFile = self.processingDict['ingestConfig']
68                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
69                redirectUrls = self.processingDict['changeURLs']
70                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
71                self._saxonJarFile = self.processingDict['saxonJarFile']               
72                self._isoClass = self.processingDict['xpathIsoClass']
73                self._xqueryConversions = self.processingDict['xqueryConversions']
74                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
75                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
76                self._currentMedinStandard = self.processingDict['currentMEDIN']
77               
78                #set boolean depending on whether we want to change urls for ndg redirection service
79                if redirectUrls == 'False':
80                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
81                        self.changeUrls = False
82                else:
83                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
84                        self.changeUrls = True
85                       
86                               
87                self.processThread = 'OAI'
88               
89               
90                #check if this is a dpws ingest
91                if dataCentre == 'dpws':
92                       
93                        dpwsProvName = None
94                       
95                        self._getPostgresDBConnectionLogging()
96                                       
97                        #get actual datacentre name for processID from dpws db
98                       
99                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID             
100                        try:   
101                                dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
102                        except:
103                                logging.error("Could not extract provider ID from DPWS db!!")
104                                sys.exit(2)
105                                                       
106                        #get provider id
107                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
108                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
109                       
110                        if dpwsProvName is None:
111                                logging.error("Could not extract provider name from DPWS db!!")
112                                sys.exit(2)
113                        else:
114                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
115                                self.dataCentre = dpwsProvName
116                               
117                                #aaggh
118                                dataCentre = self.dataCentre                           
119                               
120                        #get provider name for provider id
121               
122                self._setupDataCentreDirs()
123               
124                #Change os directory to that with the harvested documents in it.
125                os.chdir(self._base_dir)
126                                               
127                # - to run on Windows under cygwin, use the following
128                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
129               
130                self.getConfigDetails(dataCentre)
131               
132                # override default settings with input keyword values, if set
133                if harvestDir:
134                        self._harvest_home = harvestDir
135                if dataFormat:
136                        self._datacentre_format = dataFormat
137                       
138                #if a process id was used when initialsing this method, use thos as the process id
139                if self.procID is not None:
140                        logging.info("Using process ID %s set by run_all)ingest.py" %self.procID)
141                        self.ingestProcessID = self.procID
142
143               
144                logging.info("Setting up logging connection database")
145               
146               
147                                                               
148                if self.ingestProcessID is not None:
149                        self._getPostgresDBConnectionLogging()
150               
151                        #if process id has been supplied, then update the logging db with a "start_ingest"
152                        if self.procID is None:
153                               
154                                tempDPWSmetadataFolder = None                                   
155                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
156                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
157                       
158                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
159                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
160                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
161                       
162                                if tempDPWSmetadataFolder is not None:
163                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
164                                        self._harvest_home = tempDPWSmetadataFolder
165                                else:
166                                        #if this still set to none then there's an error!
167                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
168                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
169                                        sys.exit(2)
170
171               
172               
173                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
174                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
175                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
176               
177               
178                # check harvest dir exists and that there are any records to harvest?           
179                if self.indFileToIngest == "":
180                        if not os.path.exists(self._harvest_home):
181                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
182                                                 %(dataCentre, self._harvest_home))
183                                return
184                        elif len(os.listdir(self._harvest_home)) == 0:
185                                logging.info("Nothing to harvest this time from %s" %dataCentre)
186                                return
187                       
188                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
189                        logging.info("Executing : " + commandline)
190                        status = os.system(commandline)
191               
192                else:
193                        #must be looking for an individual file to upload
194                        if not os.path.exists(self.indFileToIngest):
195                               
196                                logging .warn("Specified file does not exist")
197                                return
198                       
199                        # Create/clear the 'in' directory pristine copy of the discovery records
200                        #fileUtils.setUpDir(originals_dir)
201                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
202                        logging.info("Executing : " + commandline)
203                        status = os.system(commandline)
204
205               
206                if status !=0:
207                    sys.exit("Failed at making pristine copy stage")
208               
209                self._setupXQueries()
210               
211                # Process the resulting files and put the data into the postgres DB
212                # - firstly set up a db connection to use
213                self._getPostgresDBConnection()
214               
215                                                               
216               
217                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
218                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
219                               
220                if self._datacentre_format in acceptedFormats:
221                       
222                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
223
224                else:
225                        logging.error("Unaccepted input format!")
226                        sys.exit()
227               
228               
229                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
230               
231                outMessage = "OAI Document ingest processing complete:\n"
232                logging.info("oai_document_ingest processing complete:")
233                isSuccess = False
234                if self._no_problem_files == 0:
235                        logging.info("All files successfully processed - cleaning harvest directory")
236                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
237                        isSuccess = True
238                else:
239                        logging.error("Problems experienced with %s files" %self._no_problem_files)
240                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
241               
242                logging.info(self.lineSeparator)
243                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
244                logging.info(message)
245                outMessage += message
246                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
247                logging.info(message)
248                outMessage += message
249                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
250                logging.info(message)
251                outMessage += message
252                message = "Number of files created = %s\n" %self._no_files_ingested
253                logging.info(message)
254                outMessage += message
255                message = "Number of files updated = %s\n" %self._no_files_changed
256                logging.info(message)
257                outMessage += message
258                message = "Number of problem files = %s\n" %self._no_problem_files
259                logging.info(message)
260                outMessage += message
261                logging.info(self.lineSeparator)
262                #Changed message to include more detail (SJD) but also now add any errors
263                if self._error_messages:
264                        outMessage += 'Errors: %s' %self._error_messages
265                        processingReport += 'Errors: %s' %self._error_messages
266
267
268               
269
270
271                #import pdb
272                #pdb.set_trace()
273                print "Script finished running."
274                return isSuccess, processingReport
275
276        def setIndFileToIngest(self, indFileToIngest):
277                '''
278                Method to set individual file to ingest if "individualFile" is invoked
279                '''
280                self.indFileToIngest = indFileToIngest
281               
282       
283        def setIngestLogID(self, processID):
284                '''
285                Method to set individual file to ingest if "individualFile" is invoked
286                '''
287                self.ingestProcessID = processID
288               
289               
290        def setOaiConfigFile(self, configFilePath):
291                '''
292                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
293                '''
294                print "**********************************************************************"
295                if configFilePath:
296                        self.oaiEditorConfig = configFilePath
297                        logging.info("Using configuration file at: " + configFilePath)
298                else:                                           
299                        self.oaiEditorConfig = './oai_document_ingester.config'
300                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
301               
302
303       
304        def usage(self):
305                '''
306                Display input params for the script
307                '''
308                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
309                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
310                print " -v - verbose mode for output logging"
311                print " -d - debug mode for output logging"
312                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
313
314                sys.exit(2)
315               
316       
317if __name__=="__main__":
318
319        print "================================="
320        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
321       
322       
323        ingester = Metadata_document_ingester() 
324        args = ingester._setupCmdLineOptions()
325       
326        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
327        dataCentre = None
328        for arg in args:
329                if len(args) == 1 and 'ingestProcessID' in arg:
330                       
331                        #yep, this is an dpws call to the ingester
332                        dataCentre = 'dpws'
333       
334        if dataCentre is not None:                     
335                ingester.processDataCentre(dataCentre='dpws')
336        else:
337                #standard "original ingest"     
338                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.