source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py @ 7300

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/Metadata_document_ingester.py@7300
Revision 7300, 12.3 KB checked in by sdonegan, 10 years ago (diff)

various updates to ingest (note change of name of main class) - can be called from dpws api now with just process id, or indivudual or run_all_ingest

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.INFO,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13from time import strftime
14import ndg.common.src.lib.fileutilities as FileUtilities
15from abstractdocumentingester import AbstractDocumentIngester
16
17class Metadata_document_ingester(AbstractDocumentIngester):
18        '''
19        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
20        - including running the various transforms and parsings to get all doc types and spatiotemporal
21        data in the correct form in the DB
22        @return outMessage: string summary of ingest outcome
23        '''
24       
25        indFileToIngest=""     
26        ingestProcessID = None
27        overrideMetadataFolder = None
28        uniqueProviderID = None
29        procID = None
30
31        def processDataCentre(self, dataCentre = None, harvestDir = None, dataFormat = None, configFileName = None, procID = None):
32                '''
33                indFileToIngest=None
34                Ingest documents from the specified data centre
35                @param dataCentre: data centre to ingest docs from
36                @keyword harvestDir: directory to get docs from - NB, this will override that
37                specified in the associated config file.  Typically this is used when a manual
38                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
39                @param dataFormat: format of data to ingest.  Overrides config file settings.
40                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
41                outMessage = summary of ingest process
42                '''
43               
44                self._no_files_ingested = 0
45                self._no_files_changed = 0
46                self._no_files_deleted = 0
47                self._no_problem_files = 0
48               
49                self.procID = procID
50               
51                self._error_messages = ''
52                self.dataCentre = dataCentre
53                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
54
55                #extract relevant directories etc from processing config gile
56               
57               
58                #self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
59                if configFileName is None:
60                        self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
61                else:
62                        self.processingDict = self.getProcessingConfig(configFileName)
63               
64               
65                self._code_dir = self.processingDict['code_directory']
66                self._base_dir = self.processingDict['base_directory']         
67                self._databaseConfigurationFile = self.processingDict['ingestConfig']
68                self.ingestLoggingDatabaseConfigurationFile = self.processingDict['ingestLoggingConfig']
69                redirectUrls = self.processingDict['changeURLs']
70                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
71                self._saxonJarFile = self.processingDict['saxonJarFile']               
72                self._isoClass = self.processingDict['xpathIsoClass']
73                self._xqueryConversions = self.processingDict['xqueryConversions']
74                self._xqueryConversionsExceptions = self.processingDict['exceptXqConv'] 
75                self._xqueryDocTypes = self.processingDict['xqDocTypes'] 
76                self._currentMedinStandard = self.processingDict['currentMEDIN']
77               
78                #set boolean depending on whether we want to change urls for ndg redirection service
79                if redirectUrls == 'False':
80                        logging.info("This ingest will NOT change url's to use the NDG redirection service!")
81                        self.changeUrls = False
82                else:
83                        logging.info("This ingest WILL change url's to use the NDG redirection service!")
84                        self.changeUrls = True
85                       
86                               
87                self.processThread = 'OAI'
88               
89               
90                #check if this is a dpws ingest
91                if dataCentre == 'dpws':
92                       
93                        dpwsProvName = None
94                       
95                        self._getPostgresDBConnectionLogging()
96                                       
97                        #get actual datacentre name for processID from dpws db
98                        sqlCmdProvID = 'select get_provider_id(%s);' %self.ingestProcessID                     
99                        dpwsProvID = self.pgc_IngestLog.runSQLCommand(sqlCmdProvID)[0][0]
100                       
101                        #get provider id
102                        sqlCmdProvName = 'select get_provider_name(%s);' %dpwsProvID
103                        dpwsProvName = self.pgc_IngestLog.runSQLCommand(sqlCmdProvName)[0][0]
104                       
105                        if dpwsProvName is None:
106                                logging.error("Could not extract provider name from DPWS db!!")
107                                sys.exit(2)
108                        else:
109                                logging.info("Provider name extracted from DPWS db is: %s" %dpwsProvName)
110                                self.dataCentre = dpwsProvName
111                               
112                                #aaggh
113                                dataCentre = self.dataCentre                           
114                               
115                        #get provider name for provider id
116               
117                self._setupDataCentreDirs()
118               
119                #Change os directory to that with the harvested documents in it.
120                os.chdir(self._base_dir)
121                                               
122                # - to run on Windows under cygwin, use the following
123                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
124               
125                self.getConfigDetails(dataCentre)
126               
127                # override default settings with input keyword values, if set
128                if harvestDir:
129                        self._harvest_home = harvestDir
130                if dataFormat:
131                        self._datacentre_format = dataFormat
132                       
133                #if a process id was used when initialsing this method, use thos as the process id
134                if self.procID is not None:
135                        logging.info("Using process ID %s set by run_all)ingest.py" %self.procID)
136                        self.ingestProcessID = self.procID
137
138               
139                logging.info("Setting up logging connection database")
140               
141               
142                                                               
143                if self.ingestProcessID is not None:
144                        self._getPostgresDBConnectionLogging()
145               
146                        #if process id has been supplied, then update the logging db with a "start_ingest"
147                        if self.procID is None:
148                               
149                                tempDPWSmetadataFolder = None                                   
150                                sqlStatusCmd = "select update_ingest_status (%s, 'start_ingest');" %self.ingestProcessID
151                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
152                       
153                                #now must get temporary metadata folder for DPWS - but NOT for auto daily ingests                       
154                                sqlFolderCmd = "select get_metadata_dir(%s);" %self.ingestProcessID
155                                tempDPWSmetadataFolder = self.pgc_IngestLog.runSQLCommand(sqlFolderCmd)[0][0]
156                       
157                                if tempDPWSmetadataFolder is not None:
158                                        logging.info("Temporary metadata folder to be used by DPWS ingest is %s " %tempDPWSmetadataFolder)
159                                        self._harvest_home = tempDPWSmetadataFolder
160                                else:
161                                        #if this still set to none then there's an error!
162                                        logging.error("Could not extract temporary metadata folder location for process %s" %self.ingestProcessID)
163                                        sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
164                                        sys.exit(2)
165
166               
167               
168                #if a dif format we need to convert to the stub iso profile so can extract all the info for ingest from that
169                if self._datacentre_format in self.processingDict['xqueryStubIsoGen'].keys():                   
170                        self._dif2isoXquery = self.processingDict['xqueryStubIsoGen'][self._datacentre_format]
171               
172               
173                # check harvest dir exists and that there are any records to harvest?           
174                if self.indFileToIngest == "":
175                        if not os.path.exists(self._harvest_home):
176                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
177                                                 %(dataCentre, self._harvest_home))
178                                return
179                        elif len(os.listdir(self._harvest_home)) == 0:
180                                logging.info("Nothing to harvest this time from %s" %dataCentre)
181                                return
182                       
183                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
184                        logging.info("Executing : " + commandline)
185                        status = os.system(commandline)
186               
187                else:
188                        #must be looking for an individual file to upload
189                        if not os.path.exists(self.indFileToIngest):
190                               
191                                logging .warn("Specified file does not exist")
192                                return
193                       
194                        # Create/clear the 'in' directory pristine copy of the discovery records
195                        #fileUtils.setUpDir(originals_dir)
196                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
197                        logging.info("Executing : " + commandline)
198                        status = os.system(commandline)
199
200               
201                if status !=0:
202                    sys.exit("Failed at making pristine copy stage")
203               
204                self._setupXQueries()
205               
206                # Process the resulting files and put the data into the postgres DB
207                # - firstly set up a db connection to use
208                self._getPostgresDBConnection()
209               
210                                                               
211               
212                #accepted formats defined in oai_processing config file - should also match formats published on API getFormat list
213                acceptedFormats = self.processingDict['acceptedFormats'].split(',')
214                               
215                if self._datacentre_format in acceptedFormats:
216                       
217                        numfilesproc,badXmlFiles,processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, True, self._datacentre_format)
218
219                else:
220                        logging.error("Unaccepted input format!")
221                        sys.exit()
222               
223               
224                #numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
225               
226                outMessage = "OAI Document ingest processing complete:\n"
227                logging.info("oai_document_ingest processing complete:")
228                isSuccess = False
229                if self._no_problem_files == 0:
230                        logging.info("All files successfully processed - cleaning harvest directory")
231                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
232                        isSuccess = True
233                else:
234                        logging.error("Problems experienced with %s files" %self._no_problem_files)
235                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
236               
237                logging.info(self.lineSeparator)
238                message = 'Total Number of files processed = %s\n' %(numfilesproc + badXmlFiles)
239                logging.info(message)
240                outMessage += message
241                message = 'Total Number of files successfully processed = %s\n' %numfilesproc
242                logging.info(message)
243                outMessage += message
244                message = 'Total Number of files with XML errors = %s\n' %badXmlFiles
245                logging.info(message)
246                outMessage += message
247                message = "Number of files created = %s\n" %self._no_files_ingested
248                logging.info(message)
249                outMessage += message
250                message = "Number of files updated = %s\n" %self._no_files_changed
251                logging.info(message)
252                outMessage += message
253                message = "Number of problem files = %s\n" %self._no_problem_files
254                logging.info(message)
255                outMessage += message
256                logging.info(self.lineSeparator)
257                #Changed message to include more detail (SJD) but also now add any errors
258                if self._error_messages:
259                        outMessage += 'Errors: %s' %self._error_messages
260                        processingReport += 'Errors: %s' %self._error_messages
261
262
263               
264
265
266                #import pdb
267                #pdb.set_trace()
268                print "Script finished running."
269                return isSuccess, processingReport
270
271        def setIndFileToIngest(self, indFileToIngest):
272                '''
273                Method to set individual file to ingest if "individualFile" is invoked
274                '''
275                self.indFileToIngest = indFileToIngest
276               
277       
278        def setIngestLogID(self, processID):
279                '''
280                Method to set individual file to ingest if "individualFile" is invoked
281                '''
282                self.ingestProcessID = processID
283               
284               
285        def setOaiConfigFile(self, configFilePath):
286                '''
287                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
288                '''
289                print "**********************************************************************"
290                if configFilePath:
291                        self.oaiEditorConfig = configFilePath
292                        logging.info("Using configuration file at: " + configFilePath)
293                else:                                           
294                        self.oaiEditorConfig = './oai_document_ingester.config'
295                        logging.info("No external configuration file has been set so using default file: %s" %self.oaiEditorConfig)
296               
297
298       
299        def usage(self):
300                '''
301                Display input params for the script
302                '''
303                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
304                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
305                print " -v - verbose mode for output logging"
306                print " -d - debug mode for output logging"
307                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
308
309                sys.exit(2)
310               
311       
312if __name__=="__main__":
313
314        print "================================="
315        print "RUNNING: configurable MEDIN ingester: oai_document_ingester.py"
316       
317       
318        ingester = Metadata_document_ingester() 
319        args = ingester._setupCmdLineOptions()
320       
321        #due to development of DPWS need to handle how we call the ingester from that, differently than local, or run_al_ingest
322        dataCentre = None
323        for arg in args:
324                if len(args) == 1 and 'ingestProcessID' in arg:
325                       
326                        #yep, this is an dpws call to the ingester
327                        dataCentre = 'dpws'
328       
329        if dataCentre is not None:                     
330                ingester.processDataCentre(dataCentre='dpws')
331        else:
332                #standard "original ingest"     
333                ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.