source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7973

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7973
Revision 7973, 13.3 KB checked in by sdonegan, 8 years ago (diff)

Added interim updates for schematron validation and sorting svn workspace post Event

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13
14from time import strftime
15import ndg.common.src.lib.fileutilities as FileUtilities
16from abstractdocumentingester import AbstractDocumentIngester
17from SchematronValidate import SchematronValidate as Schematron
18
19class Metadata_document_ingester(AbstractDocumentIngester):
20        '''
21        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
22        - including running the various transforms and parsings to get all doc types and spatiotemporal
23        data in the correct form in the DB
24        @return outMessage: string summary of ingest outcome
25        '''
26       
27        indFileToIngest=""     
28        ingestProcessID = None
29        overrideMetadataFolder = None
30        uniqueProviderID = None
31        procID = None
32
33        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
34                '''
35                indFileToIngest=None
36                Ingest documents from the specified data centre
37                @param dataCentre: data centre to ingest docs from
38                @keyword harvestDir: directory to get docs from - NB, this will override that
39                specified in the associated config file.  Typically this is used when a manual
40                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
41                @param dataFormat: format of data to ingest.  Overrides config file settings.
42                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
43                outMessage = summary of ingest process
44                '''
45               
46                self._no_files_ingested = 0
47                self._no_files_changed = 0
48                self._no_files_deleted = 0
49                self._no_problem_files = 0
50               
51                if procID is not None:
52                        self.procID = procID.replace(',','')
53                else:
54                        self.procID = procID
55               
56                self._error_messages = ''
57                self.dataCentre = dataCentre
58                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
59
60                #extract relevant directories etc from processing config gile
61                               
62                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
63                if configFileName is None:
64                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
65                else:
66                        self.processingDict = self.getProcessingConfig(configFileName)
67               
68               
69                self._code_dir = self.processingDict['code_directory']
70                self._base_dir = self.processingDict['base_directory']         
71                self._databaseConfigurationFile = self.processingDict['ingestConfig']
72                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
73                redirectUrls = self.processingDict['changeURLs']
74                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
75                self._saxonJarFile = self.processingDict['saxonJarFile']               
76                self._isoClass = self.processingDict['xpathIsoClass']
77                self._xqueryConversions = self.processingDict['xqueryConversions']
78                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
79                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
80                self._currentMedinStandard = self.processingDict['currentMEDIN']
81                self._schematronLocation = self.processingDict['schematronLocation']
82                self._schematronXSLLocation = self.processingDict['schematronXSLloc']
83                self._schematronValidationInsert = self.processingDict['schematronValFail']
84               
85                if self._schematronValidationInsert == 'False':
86                        self._schematronValidationInsert = False
87                else:
88                        self._schematronValidationInsert = True
89               
90                #set boolean depending on whether we want to change urls for ndg redirection service
91                if redirectUrls == 'False':
92                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
93                        self.changeUrls = False
94                else:
95                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
96                        self.changeUrls = True
97                       
98                               
99                self.processThread = 'OAI'
100               
101                dpwsProvID = None
102               
103                #check if this is a dpws ingest
104                if dataCentre == 'dpws':
105                       
106                        dpwsProvName = None
107                       
108                        self._getPostgresDBConnectionLogging()
109                                       
110                        #get actual datacentre name for processID from dpws db
111                       
112                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID             
113                        try:   
114                                dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
115                        except:
116                                logging.error("Could not extract provider ID from DPWS db!!")
117                                sys.exit(2)
118                       
119                        import pdb
120                        pdb.set_trace()
121                        #get provider id
122                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
123                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
124                       
125                        if dpwsProvName is None:
126                                logging.error("Could not extract provider name from DPWS db!!")
127                                sys.exit(2)
128                        else:
129                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
130                                self.dataCentre = dpwsProvName
131                               
132                                #aaggh
133                                dataCentre = self.dataCentre                           
134                               
135                        #get provider name for provider id
136               
137                self._setupDataCentreDirs()
138               
139                #set up schematron validation class             
140                #self.SchematronValidate = Schematron(self._schematronLocation,self._schematronXSLLocation,self.ingestReport_dir)
141               
142                #Change os directory to that with the harvested documents in it.
143                os.chdir(self._base_dir)
144                                               
145                # - to run on Windows under cygwin, use the following
146                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
147               
148                if dpwsProvID:
149                        self.getConfigDetails(dataCentre,dpwsID = dpwsProvID)
150                else:
151                        self.getConfigDetails(dataCentre,dpwsID = None)
152               
153               
154                # override default settings with input keyword values, if set
155                if harvestDir:
156                        self._harvest_home = harvestDir
157                if dataFormat:
158                        self._datacentre_format = dataFormat
159                       
160                #if a process id was used when initialsing this method, use thos as the process id
161                if self.procID is not None:
162                        logging.info("Using process ID %s set by run_all_ingest.py" %self.procID)
163                        self.ingestProcessID = self.procID
164
165               
166                logging.info("Setting up logging connection database")
167               
168               
169                                                               
170                if self.ingestProcessID is not None:
171                        self._getPostgresDBConnectionLogging()
172               
173                        #if process id has been supplied, then update the logging db with a "start_ingest"
174                        if self.procID is None:
175                               
176                                tempDPWSmetadataFolder = None                                   
177                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
178                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
179                       
180                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
181                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
182                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
183                               
184                                if tempDPWSmetadataFolder is not None:
185                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
186                                        self._harvest_home = tempDPWSmetadataFolder
187                                else:
188                                        #if this still set to none then there's an error!
189                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
190                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
191                                        sys.exit(2)
192
193               
194               
195                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
196                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
197                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
198               
199               
200                # check harvest dir exists and that there are any records to harvest?           
201                if self.indFileToIngest == "":
202                        if not os.path.exists(self._harvest_home):
203                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
204                                                 %(dataCentre, self._harvest_home))
205                                return
206                        elif len(os.listdir(self._harvest_home)) == 0:
207                                logging.info("Nothing to harvest this time from %s" %dataCentre)
208                                return
209                       
210                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
211                        logging.info("Executing : " + commandline)
212                        status = os.system(commandline)
213               
214                else:
215                        #must be looking for an individual file to upload
216                        if not os.path.exists(self.indFileToIngest):
217                               
218                                logging .warn("Specified file does not exist")
219                                return
220                       
221                        # Create/clear the 'in' directory pristine copy of the discovery records
222                        #fileUtils.setUpDir(originals_dir)
223                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
224                        logging.info("Executing : " + commandline)
225                        status = os.system(commandline)
226
227               
228                if status !=0:
229                    sys.exit("Failed at making pristine copy stage")
230               
231                self._setupXQueries()
232               
233                # Process the resulting files and put the data into the postgres DB
234                # - firstly set up a db connection to use
235                self._getPostgresDBConnection()
236               
237                                                               
238               
239                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
240                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
241                               
242                if self._datacentre_format in acceptedFormats:
243                                               
244                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
245
246                else:
247                        logging.error("Unaccepted input format!")
248                        sys.exit()
249               
250               
251                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
252               
253                outMessage = "OAI Document ingest processing complete:\n"
254                logging.info("oai_document_ingest processing complete:")
255                isSuccess = False
256                if self._no_problem_files == 0:
257                        logging.info("All files successfully processed - cleaning harvest directory")
258                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
259                        isSuccess = True
260                else:
261                        logging.error("Problems experienced with %s files" %self._no_problem_files)
262                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
263               
264                logging.info(self.lineSeparator)
265                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
266                logging.info(message)
267                outMessage += message
268                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
269                logging.info(message)
270                outMessage += message
271                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
272                logging.info(message)
273                outMessage += message
274                message = "Number of files created = %s\n" %self._no_files_ingested
275                logging.info(message)
276                outMessage += message
277                message = "Number of files updated = %s\n" %self._no_files_changed
278                logging.info(message)
279                outMessage += message
280                message = "Number of problem files = %s\n" %self._no_problem_files
281                logging.info(message)
282                outMessage += message
283                logging.info(self.lineSeparator)
284                #Changed message to include more detail (SJD) but also now add any errors
285                if self._error_messages:
286                        outMessage += 'Errors: %s' %self._error_messages
287                        processingReport += 'Errors: %s' %self._error_messages
288
289
290               
291
292
293                #import pdb
294                #pdb.set_trace()
295                print "Script finished running."
296                return isSuccess, processingReport
297
298        def setIndFileToIngest(self, indFileToIngest):
299                '''
300                Method to set individual file to ingest if "individualFile" is invoked
301                '''
302                self.indFileToIngest = indFileToIngest
303               
304       
305        def setIngestLogID(self, processID):
306                '''
307                Method to set individual file to ingest if "individualFile" is invoked
308                '''             
309                self.ingestProcessID = processID.replace(',','')
310                logging.info("Setting ingest process id: %s" %self.ingestProcessID)
311               
312               
313        def setOaiConfigFile(self, configFilePath):
314                '''
315                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
316                '''
317                print "**********************************************************************"
318                if configFilePath:
319                        self.oaiEditorConfig = configFilePath
320                        logging.info("Using configuration file at: " + configFilePath)
321                else:                                           
322                        self.oaiEditorConfig = './oai_document_ingester.config'
323                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
324               
325
326       
327        def usage(self):
328                '''
329                Display input params for the script
330                '''
331                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
332                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
333                print " -v - verbose mode for output logging"
334                print " -d - debug mode for output logging"
335                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
336
337                sys.exit(2)
338               
339       
340if __name__=="__main__":
341
342        print "================================="
343        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
344       
345       
346        ingester = Metadata_document_ingester() 
347        args = ingester._setupCmdLineOptions()
348       
349        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
350        dataCentre = None
351        for arg in args:
352                if len(args) == 1 and 'ingestProcessID' in arg:
353                       
354                        #yep, this is an dpws call to the ingester
355                        dataCentre = 'dpws'
356       
357        if dataCentre is not None:                                     
358                ingester.processDataCentre(dataCentre='dpws')
359        else:
360                #standard "original ingest"     
361                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.