source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 6194

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py
Revision 6194, 6.6 KB checked in by sdonegan, 10 years ago (diff)

Added thread id so abstractdocumentingester.py can distinguish between calling methods

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, logging
9# annoyingly, an import (CSML file, I think) sets the logging config during imports - so set this
10# here to get there first - since you can only set the config once
11logging.basicConfig(level=logging.DEBUG,
12                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
13from time import strftime
14import ndg.common.src.lib.fileutilities as FileUtilities
15from abstractdocumentingester import AbstractDocumentIngester
16
17class oai_document_ingester(AbstractDocumentIngester):
18        '''
19        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
20        - including running the various transforms and parsings to get all doc types and spatiotemporal
21        data in the correct form in the DB
22        @return outMessage: string summary of ingest outcome
23        '''
24       
25        indFileToIngest=""
26
27        def processDataCentre(self, dataCentre, harvestDir = None, dataFormat = None):
28                '''
29                indFileToIngest=None
30                Ingest documents from the specified data centre
31                @param dataCentre: data centre to ingest docs from
32                @keyword harvestDir: directory to get docs from - NB, this will override that
33                specified in the associated config file.  Typically this is used when a manual
34                harvest has retrieved docs to a local dir (see OAIInfoEditor.lib.harvester).
35                @param dataFormat: format of data to ingest.  Overrides config file settings.
36                @return isSuccess, outMessage: isSuccess = True if ingest completes ok
37                outMessage = summary of ingest process
38                '''
39                self._no_files_ingested = 0
40                self._no_files_changed = 0
41                self._no_files_deleted = 0
42                self._no_problem_files = 0
43               
44                self._error_messages = ''
45                self.dataCentre = dataCentre
46                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
47
48                #extract relevant directories etc from processing config gile
49                self.processingDict = self.getProcessingConfig('/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.config')
50                #self._code_dir = "/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/" # this is the base dir that the script is ran from
51                #self._base_dir = "/home/badc/discovery_docs/ingestDocs/"
52               
53                self._code_dir = self.processingDict['code_directory']
54                self._base_dir = self.processingDict['base_directory']
55               
56                self.processThread = 'OAI'
57               
58                self._setupDataCentreDirs()
59               
60                #Change os directory to that with the harvested documents in it.
61                os.chdir(self._base_dir)
62                                               
63                # - to run on Windows under cygwin, use the following
64                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
65               
66                self.getConfigDetails(dataCentre)
67                # override default settings with input keyword values, if set
68                if harvestDir:
69                        self._harvest_home = harvestDir
70                if dataFormat:
71                        self._datacentre_format = dataFormat
72
73               
74                # check harvest dir exists and that there are any records to harvest?
75                if self.indFileToIngest == "":
76                        if not os.path.exists(self._harvest_home):
77                                logging.error("Harvest directory for dataCentre %s (%s) could not be found - exiting" \
78                                                 %(dataCentre, self._harvest_home))
79                                return
80                        elif len(os.listdir(self._harvest_home)) == 0:
81                                logging.info("Nothing to harvest this time from %s" %dataCentre)
82                                return
83                       
84                        commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + self.originals_dir
85                        logging.info("Executing : " + commandline)
86                        status = os.system(commandline)
87               
88                else:
89                        #must be looking for an individual file to upload
90                        if not os.path.exists(self.indFileToIngest):
91                                logging .warn("Specified file does not exist")
92                                return
93                       
94                        # Create/clear the 'in' directory pristine copy of the discovery records
95                        #fileUtils.setUpDir(originals_dir)
96                        commandline = "cp " + self.indFileToIngest + " " +  self.originals_dir
97                        logging.info("Executing : " + commandline)
98                        status = os.system(commandline)
99
100               
101                if status !=0:
102                    sys.exit("Failed at making pristine copy stage")
103               
104                self._setupXQueries()
105
106                # Process the resulting files and put the data into the postgres DB
107                # - firstly set up a db connection to use
108                self._getPostgresDBConnection()
109
110                numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, dataCentre, True)
111               
112                outMessage = "OAI Document ingest processing complete:\n"
113                logging.info("oai_document_ingest processing complete:")
114                isSuccess = False
115                if self._no_problem_files == 0:
116                        logging.info("All files successfully processed - cleaning harvest directory")
117                        #FileUtilities.cleanDir(self._harvest_home) # TODO: uncomment this!
118                        isSuccess = True
119                else:
120                        logging.error("Problems experienced with %s files" %self._no_problem_files)
121                        logging.error("- harvnegest directory will not be cleared until these have been fixed and the script has been reran")
122               
123                logging.info(self.lineSeparator)
124                message = 'Number of files processed = %s\n' %numfilesproc
125                logging.info(message)
126                outMessage += message
127                message = "Number of files created = %s\n" %self._no_files_ingested
128                logging.info(message)
129                outMessage += message
130                message = "Number of files updated = %s\n" %self._no_files_changed
131                logging.info(message)
132                outMessage += message
133               
134                #Changed message to include more detail (SJD) but also now add any errors
135                if self._error_messages:
136                        outMessage += 'Errors: %s' %self._error_messages
137                        processingReport += 'Errors: %s' %self._error_messages
138
139
140               
141                print "Script finished running."
142                return isSuccess, processingReport
143
144        def setIndFileToIngest(self, indFileToIngest):
145                '''
146                Method to set individual file to ingest if "individualFile" is invoked
147                '''
148                self.indFileToIngest = indFileToIngest
149
150       
151        def usage(self):
152                '''
153                Display input params for the script
154                '''
155                print "Usage: python oai_document_ingester.py [OPTION] <datacentre>"
156                print " - where:\n   <datacentre> is the data centre to ingest data from; and options are:"
157                print " -v - verbose mode for output logging"
158                print " -d - debug mode for output logging"
159                print " individualFile= - specify individual file to upload rather than batch processing as defined in properties file.  \n      (NOTE: script still uses properties file for other parameters)\n"
160
161                sys.exit(2)
162               
163       
164if __name__=="__main__":
165
166        print "================================="
167        print "RUNNING: oai_document_ingester.py"
168       
169       
170        ingester = oai_document_ingester()     
171        args = ingester._setupCmdLineOptions()
172        ingester.processDataCentre(args[0])
Note: See TracBrowser for help on using the repository browser.