source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7857

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7857
Revision 7857, 12.5 KB checked in by sdonegan, 10 years ago (diff)

Updated to take data centre configuration settings from dpws db when a dpws activated ingest is started

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13from time import strftime
14import ndg.common.src.lib.fileutilities as FileUtilities
15from abstractdocumentingester import AbstractDocumentIngester
16
17class Metadata_document_ingester(AbstractDocumentIngester):
18        '''
19        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
20        - including running the various transforms and parsings to get all doc types and spatiotemporal
21        data in the correct form in the DB
22        @return outMessage: string summary of ingest outcome
23        '''
24       
25        indFileToIngest=""     
26        ingestProcessID = None
27        overrideMetadataFolder = None
28        uniqueProviderID = None
29        procID = None
30
31        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
32                '''
33                indFileToIngest=None
34                Ingest documents from the specified data centre
35                @param dataCentre: data centre to ingest docs from
36                @keyword harvestDir: directory to get docs from - NB, this will override that
37                specified in the associated config file.  Typically this is used when a manual
38                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
39                @param dataFormat: format of data to ingest.  Overrides config file settings.
40                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
41                outMessage = summary of ingest process
42                '''
43               
44                self._no_files_ingested = 0
45                self._no_files_changed = 0
46                self._no_files_deleted = 0
47                self._no_problem_files = 0
48               
49                self.procID = procID
50               
51                self._error_messages = ''
52                self.dataCentre = dataCentre
53                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
54
55                #extract relevant directories etc from processing config gile
56                               
57                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
58                if configFileName is None:
59                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
60                else:
61                        self.processingDict = self.getProcessingConfig(configFileName)
62               
63               
64                self._code_dir = self.processingDict['code_directory']
65                self._base_dir = self.processingDict['base_directory']         
66                self._databaseConfigurationFile = self.processingDict['ingestConfig']
67                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
68                redirectUrls = self.processingDict['changeURLs']
69                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
70                self._saxonJarFile = self.processingDict['saxonJarFile']               
71                self._isoClass = self.processingDict['xpathIsoClass']
72                self._xqueryConversions = self.processingDict['xqueryConversions']
73                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
74                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
75                self._currentMedinStandard = self.processingDict['currentMEDIN']
76               
77                #set boolean depending on whether we want to change urls for ndg redirection service
78                if redirectUrls == 'False':
79                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
80                        self.changeUrls = False
81                else:
82                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
83                        self.changeUrls = True
84                       
85                               
86                self.processThread = 'OAI'
87               
88               
89                #check if this is a dpws ingest
90                if dataCentre == 'dpws':
91                       
92                        dpwsProvName = None
93                       
94                        self._getPostgresDBConnectionLogging()
95                                       
96                        #get actual datacentre name for processID from dpws db
97                       
98                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID             
99                        try:   
100                                dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
101                        except:
102                                logging.error("Could not extract provider ID from DPWS db!!")
103                                sys.exit(2)
104                                                       
105                        #get provider id
106                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
107                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
108                       
109                        if dpwsProvName is None:
110                                logging.error("Could not extract provider name from DPWS db!!")
111                                sys.exit(2)
112                        else:
113                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
114                                self.dataCentre = dpwsProvName
115                               
116                                #aaggh
117                                dataCentre = self.dataCentre                           
118                               
119                        #get provider name for provider id
120               
121                self._setupDataCentreDirs()
122               
123                #Change os directory to that with the harvested documents in it.
124                os.chdir(self._base_dir)
125                                               
126                # - to run on Windows under cygwin, use the following
127                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
128               
129                if dpwsProvID:
130                        self.getConfigDetails(dataCentre,dpwsID = dpwsProvID)
131                else:
132                        self.getConfigDetails(dataCentre,dpwsID = None)
133               
134               
135                # override default settings with input keyword values, if set
136                if harvestDir:
137                        self._harvest_home = harvestDir
138                if dataFormat:
139                        self._datacentre_format = dataFormat
140                       
141                #if a process id was used when initialsing this method, use thos as the process id
142                if self.procID is not None:
143                        logging.info("Using process ID %s set by run_all_ingest.py" %self.procID)
144                        self.ingestProcessID = self.procID
145
146               
147                logging.info("Setting up logging connection database")
148               
149               
150                                                               
151                if self.ingestProcessID is not None:
152                        self._getPostgresDBConnectionLogging()
153               
154                        #if process id has been supplied, then update the logging db with a "start_ingest"
155                        if self.procID is None:
156                               
157                                tempDPWSmetadataFolder = None                                   
158                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
159                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
160                       
161                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
162                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
163                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
164                               
165                                if tempDPWSmetadataFolder is not None:
166                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
167                                        self._harvest_home = tempDPWSmetadataFolder
168                                else:
169                                        #if this still set to none then there's an error!
170                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
171                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
172                                        sys.exit(2)
173
174               
175               
176                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
177                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
178                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
179               
180               
181                # check harvest dir exists and that there are any records to harvest?           
182                if self.indFileToIngest == "":
183                        if not os.path.exists(self._harvest_home):
184                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
185                                                 %(dataCentre, self._harvest_home))
186                                return
187                        elif len(os.listdir(self._harvest_home)) == 0:
188                                logging.info("Nothing to harvest this time from %s" %dataCentre)
189                                return
190                       
191                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
192                        logging.info("Executing : " + commandline)
193                        status = os.system(commandline)
194               
195                else:
196                        #must be looking for an individual file to upload
197                        if not os.path.exists(self.indFileToIngest):
198                               
199                                logging .warn("Specified file does not exist")
200                                return
201                       
202                        # Create/clear the 'in' directory pristine copy of the discovery records
203                        #fileUtils.setUpDir(originals_dir)
204                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
205                        logging.info("Executing : " + commandline)
206                        status = os.system(commandline)
207
208               
209                if status !=0:
210                    sys.exit("Failed at making pristine copy stage")
211               
212                self._setupXQueries()
213               
214                # Process the resulting files and put the data into the postgres DB
215                # - firstly set up a db connection to use
216                self._getPostgresDBConnection()
217               
218                                                               
219               
220                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
221                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
222                               
223                if self._datacentre_format in acceptedFormats:
224                       
225                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
226
227                else:
228                        logging.error("Unaccepted input format!")
229                        sys.exit()
230               
231               
232                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
233               
234                outMessage = "OAI Document ingest processing complete:\n"
235                logging.info("oai_document_ingest processing complete:")
236                isSuccess = False
237                if self._no_problem_files == 0:
238                        logging.info("All files successfully processed - cleaning harvest directory")
239                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
240                        isSuccess = True
241                else:
242                        logging.error("Problems experienced with %s files" %self._no_problem_files)
243                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
244               
245                logging.info(self.lineSeparator)
246                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
247                logging.info(message)
248                outMessage += message
249                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
250                logging.info(message)
251                outMessage += message
252                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
253                logging.info(message)
254                outMessage += message
255                message = "Number of files created = %s\n" %self._no_files_ingested
256                logging.info(message)
257                outMessage += message
258                message = "Number of files updated = %s\n" %self._no_files_changed
259                logging.info(message)
260                outMessage += message
261                message = "Number of problem files = %s\n" %self._no_problem_files
262                logging.info(message)
263                outMessage += message
264                logging.info(self.lineSeparator)
265                #Changed message to include more detail (SJD) but also now add any errors
266                if self._error_messages:
267                        outMessage += 'Errors: %s' %self._error_messages
268                        processingReport += 'Errors: %s' %self._error_messages
269
270
271               
272
273
274                #import pdb
275                #pdb.set_trace()
276                print "Script finished running."
277                return isSuccess, processingReport
278
279        def setIndFileToIngest(self, indFileToIngest):
280                '''
281                Method to set individual file to ingest if "individualFile" is invoked
282                '''
283                self.indFileToIngest = indFileToIngest
284               
285       
286        def setIngestLogID(self, processID):
287                '''
288                Method to set individual file to ingest if "individualFile" is invoked
289                '''
290                self.ingestProcessID = processID
291               
292               
293        def setOaiConfigFile(self, configFilePath):
294                '''
295                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
296                '''
297                print "**********************************************************************"
298                if configFilePath:
299                        self.oaiEditorConfig = configFilePath
300                        logging.info("Using configuration file at: " + configFilePath)
301                else:                                           
302                        self.oaiEditorConfig = './oai_document_ingester.config'
303                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
304               
305
306       
307        def usage(self):
308                '''
309                Display input params for the script
310                '''
311                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
312                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
313                print " -v - verbose mode for output logging"
314                print " -d - debug mode for output logging"
315                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
316
317                sys.exit(2)
318               
319       
320if __name__=="__main__":
321
322        print "================================="
323        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
324       
325       
326        ingester = Metadata_document_ingester() 
327        args = ingester._setupCmdLineOptions()
328       
329        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
330        dataCentre = None
331        for arg in args:
332                if len(args) == 1 and 'ingestProcessID' in arg:
333                       
334                        #yep, this is an dpws call to the ingester
335                        dataCentre = 'dpws'
336       
337        if dataCentre is not None:                     
338                ingester.processDataCentre(dataCentre='dpws')
339        else:
340                #standard "original ingest"     
341                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.