source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7975

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7975
Revision 7975, 13.2 KB checked in by sdonegan, 8 years ago (diff)

Updates added now must ALWAYS specify config file..

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13
14from time import strftime
15import ndg.common.src.lib.fileutilities as FileUtilities
16from abstractdocumentingester import AbstractDocumentIngester
17from SchematronValidate import SchematronValidate as Schematron
18
19class Metadata_document_ingester(AbstractDocumentIngester):
20        '''
21        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
22        - including running the various transforms and parsings to get all doc types and spatiotemporal
23        data in the correct form in the DB
24        @return outMessage: string summary of ingest outcome
25        '''
26       
27        indFileToIngest=""     
28        ingestProcessID = None
29        overrideMetadataFolder = None
30        uniqueProviderID = None
31        procID = None
32
33        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
34                '''
35                indFileToIngest=None
36                Ingest documents from the specified data centre
37                @param dataCentre: data centre to ingest docs from
38                @keyword harvestDir: directory to get docs from - NB, this will override that
39                specified in the associated config file.  Typically this is used when a manual
40                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
41                @param dataFormat: format of data to ingest.  Overrides config file settings.
42                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
43                outMessage = summary of ingest process
44                '''
45               
46                self._no_files_ingested = 0
47                self._no_files_changed = 0
48                self._no_files_deleted = 0
49                self._no_problem_files = 0
50               
51                if procID is not None:
52                        self.procID = procID.replace(',','')
53                else:
54                        self.procID = procID
55               
56                self._error_messages = ''
57                self.dataCentre = dataCentre
58                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
59
60                #extract relevant directories etc from processing config gile
61                               
62                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
63                if configFileName is None:
64                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
65                else:
66                        self.processingDict = self.getProcessingConfig(configFileName)
67               
68               
69                self._code_dir = self.processingDict['code_directory']
70                self._base_dir = self.processingDict['base_directory']         
71                self._databaseConfigurationFile = self.processingDict['ingestConfig']
72                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
73                redirectUrls = self.processingDict['changeURLs']
74                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
75                self._saxonJarFile = self.processingDict['saxonJarFile']               
76                self._isoClass = self.processingDict['xpathIsoClass']
77                self._xqueryConversions = self.processingDict['xqueryConversions']
78                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
79                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
80                self._currentMedinStandard = self.processingDict['currentMEDIN']
81                self._schematronLocation = self.processingDict['schematronLocation']
82                self._schematronXSLLocation = self.processingDict['schematronXSLloc']
83                self._schematronValidationInsert = self.processingDict['schematronValFail']
84               
85                if self._schematronValidationInsert == 'False':
86                        self._schematronValidationInsert = False
87                else:
88                        self._schematronValidationInsert = True
89               
90                #set boolean depending on whether we want to change urls for ndg redirection service
91                if redirectUrls == 'False':
92                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
93                        self.changeUrls = False
94                else:
95                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
96                        self.changeUrls = True
97                       
98                               
99                self.processThread = 'OAI'
100               
101                dpwsProvID = None
102               
103                #check if this is a dpws ingest
104                if dataCentre == 'dpws':
105                       
106                        dpwsProvName = None
107                       
108                        self._getPostgresDBConnectionLogging()
109                                       
110                        #get actual datacentre name for processID from dpws db
111                       
112                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID             
113                        try:   
114                                dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
115                        except:
116                                logging.error("Could not extract provider ID from DPWS db!!")
117                                sys.exit(2)
118                                               
119                        #get provider id
120                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
121                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
122                       
123                        if dpwsProvName is None:
124                                logging.error("Could not extract provider name from DPWS db!!")
125                                sys.exit(2)
126                        else:
127                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
128                                self.dataCentre = dpwsProvName
129                               
130                                #aaggh
131                                dataCentre = self.dataCentre                           
132                               
133                        #get provider name for provider id
134               
135                self._setupDataCentreDirs()
136               
137                #set up schematron validation class             
138                #self.SchematronValidate = Schematron(self._schematronLocation,self._schematronXSLLocation,self.ingestReport_dir)
139               
140                #Change os directory to that with the harvested documents in it.
141                os.chdir(self._base_dir)
142                                               
143                # - to run on Windows under cygwin, use the following
144                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
145               
146                if dpwsProvID:
147                        self.getConfigDetails(dataCentre,dpwsID = dpwsProvID)
148                else:
149                        self.getConfigDetails(dataCentre,dpwsID = None)
150               
151               
152                # override default settings with input keyword values, if set
153                if harvestDir:
154                        self._harvest_home = harvestDir
155                if dataFormat:
156                        self._datacentre_format = dataFormat
157                       
158                #if a process id was used when initialsing this method, use thos as the process id
159                if self.procID is not None:
160                        logging.info("Using process ID %s set by run_all_ingest.py" %self.procID)
161                        self.ingestProcessID = self.procID
162
163               
164                logging.info("Setting up logging connection database")
165               
166               
167                                                               
168                if self.ingestProcessID is not None:
169                        self._getPostgresDBConnectionLogging()
170               
171                        #if process id has been supplied, then update the logging db with a "start_ingest"
172                        if self.procID is None:
173                               
174                                tempDPWSmetadataFolder = None                                   
175                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
176                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
177                       
178                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
179                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
180                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
181                               
182                                if tempDPWSmetadataFolder is not None:
183                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
184                                        self._harvest_home = tempDPWSmetadataFolder
185                                else:
186                                        #if this still set to none then there's an error!
187                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
188                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
189                                        sys.exit(2)
190
191               
192               
193                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
194                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
195                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
196               
197               
198                # check harvest dir exists and that there are any records to harvest?           
199                if self.indFileToIngest == "":
200                        if not os.path.exists(self._harvest_home):
201                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
202                                                 %(dataCentre, self._harvest_home))
203                                return
204                        elif len(os.listdir(self._harvest_home)) == 0:
205                                logging.info("Nothing to harvest this time from %s" %dataCentre)
206                                return
207                       
208                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
209                        logging.info("Executing : " + commandline)
210                        status = os.system(commandline)
211               
212                else:
213                        #must be looking for an individual file to upload
214                        if not os.path.exists(self.indFileToIngest):
215                               
216                                logging .warn("Specified file does not exist")
217                                return
218                       
219                        # Create/clear the 'in' directory pristine copy of the discovery records
220                        #fileUtils.setUpDir(originals_dir)
221                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
222                        logging.info("Executing : " + commandline)
223                        status = os.system(commandline)
224
225               
226                if status !=0:
227                    sys.exit("Failed at making pristine copy stage")
228               
229                self._setupXQueries()
230               
231                # Process the resulting files and put the data into the postgres DB
232                # - firstly set up a db connection to use
233                self._getPostgresDBConnection()
234               
235                                                               
236               
237                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
238                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
239                               
240                if self._datacentre_format in acceptedFormats:
241                                               
242                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
243
244                else:
245                        logging.error("Unaccepted input format!")
246                        sys.exit()
247               
248               
249                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
250               
251                outMessage = "OAI Document ingest processing complete:\n"
252                logging.info("oai_document_ingest processing complete:")
253                isSuccess = False
254                if self._no_problem_files == 0:
255                        logging.info("All files successfully processed - cleaning harvest directory")
256                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
257                        isSuccess = True
258                else:
259                        logging.error("Problems experienced with %s files" %self._no_problem_files)
260                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
261               
262                logging.info(self.lineSeparator)
263                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
264                logging.info(message)
265                outMessage += message
266                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
267                logging.info(message)
268                outMessage += message
269                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
270                logging.info(message)
271                outMessage += message
272                message = "Number of files created = %s\n" %self._no_files_ingested
273                logging.info(message)
274                outMessage += message
275                message = "Number of files updated = %s\n" %self._no_files_changed
276                logging.info(message)
277                outMessage += message
278                message = "Number of problem files = %s\n" %self._no_problem_files
279                logging.info(message)
280                outMessage += message
281                logging.info(self.lineSeparator)
282                #Changed message to include more detail (SJD) but also now add any errors
283                if self._error_messages:
284                        outMessage += 'Errors: %s' %self._error_messages
285                        processingReport += 'Errors: %s' %self._error_messages
286
287
288               
289
290
291                #import pdb
292                #pdb.set_trace()
293                print "Script finished running."
294                return isSuccess, processingReport
295
296        def setIndFileToIngest(self, indFileToIngest):
297                '''
298                Method to set individual file to ingest if "individualFile" is invoked
299                '''
300                self.indFileToIngest = indFileToIngest
301               
302       
303        def setIngestLogID(self, processID):
304                '''
305                Method to set individual file to ingest if "individualFile" is invoked
306                '''             
307                self.ingestProcessID = processID.replace(',','')
308                logging.info("Setting ingest process id: %s" %self.ingestProcessID)
309               
310               
311        def setOaiConfigFile(self, configFilePath):
312                '''
313                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
314                '''
315                print "**********************************************************************"
316                if configFilePath:
317                        self.oaiEditorConfig = configFilePath
318                        logging.info("Using configuration file at: " + configFilePath)
319                else:                                           
320                        self.oaiEditorConfig = './oai_document_ingester.config'
321                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
322               
323
324       
325        def usage(self):
326                '''
327                Display input params for the script
328                '''
329                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
330                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
331                print " -v - verbose mode for output logging"
332                print " -d - debug mode for output logging"
333                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
334
335                sys.exit(2)
336               
337       
338if __name__=="__main__":
339
340        print "================================="
341        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
342       
343       
344        ingester = Metadata_document_ingester() 
345        args = ingester._setupCmdLineOptions()
346       
347        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
348        dataCentre = None
349       
350        for arg in args:
351                if 'ingestProcessID' in arg:
352                       
353                        #yep, this is an dpws call to the ingester
354                        dataCentre = 'dpws'
355           
356        if dataCentre is not None:                                     
357                ingester.processDataCentre(dataCentre='dpws')
358        else:
359                #standard "original ingest"     
360                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.