source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py @ 3862

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.py@3862
Revision 3862, 12.6 KB checked in by cbyrom, 12 years ago (diff)

Fix workflow so that transforms are only ran when actually required

  • i.e. when the data is not already in the DB + fix a few small bugs

+ add extra logging + wrapper spatiotemporal data in record via get
methods.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from DIF import DIF
12from MDIP import MDIP
13import ndgUtils
14from ndgUtils.ndgXqueries import ndgXqueries
15from FileUtilities import FileUtilities
16from PostgresRecord import PostgresRecord
17from PostgresDAO import PostgresDAO
18import db_funcs
19
20class oai_document_ingester:
21        '''
22        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
23        - including running the various transforms and parsings to get all doc types and spatiotemporal
24        data in the correct form in the DB
25        '''
26
27        def getID(self, filename):
28                '''
29                Gets the identifier out of an input metadata xml record.
30                Copes with DIF and MDIP currently.
31                @param filename - name of document file being processed
32                @return: ID - id to use to refer to the document
33                '''
34                logging.info("Retrieving identifier for metadata record " + filename)
35                xml=file(filename).read()
36                if self._datacentre_format == "DIF":
37                    d=DIF(xml)
38                    ID=d.entryID
39                elif self._datacentre_format == "MDIP":
40                    d=MDIP(xml)
41                    ID=d.id
42                else:
43                    sys.exit("Only handles DIF or MDIP here.")
44       
45                logging.info("Found identifier: " + ID)
46                return ID
47       
48       
49        def addFileToPostgresDB(self, filename):
50                '''
51                Add a file to the postgres DB - extracting and storing all the required
52                data in the process
53                @param filename: full path of file to add to postgres DB
54                '''
55                if not os.path.isfile(filename):
56                        logging.info("Skipping, %s - not a valid file" %filename)
57                        return
58               
59                logging.info("Adding file, " + filename + ", to postgres DB")
60                discoveryID = self.getID(filename)
61               
62                # first of all create a PostgresRecord - this object represents all the data required
63                # for a DB entry
64                try:
65                        record = PostgresRecord(filename, self._NDG_dataProvider, \
66                                                            self._datacentre_groups, self._datacentre_namespace, \
67                                                            discoveryID, self._xq, self._datacentre_format)
68       
69                        # Now create the data access object to interface to the DB
70                        dao = PostgresDAO(record, self._dbConnection)
71               
72                        # Finally, write the new record
73                        if dao.createOrUpdateRecord():
74                                self._no_files_ingested += 1
75                except:
76                        logging.error("Exception thrown - detail: ")
77                        logging.error(sys.exc_info())
78                       
79       
80       
81        def getConfigDetails(self, datacentre):
82                '''
83                Get the harvested records directory and groups for this datacentre from the
84                datacentre specific config file.  The harvested records directory depends on the
85                datacentres OAI base url, the set and format. These have to be know up-front.
86                The groups denote which 'portal groups' they belong to - for limiting searches to
87                say NERC-only datacentres records.
88                Groups are added to the intermediate MOLES when it is created.
89                @param datacentre: datacentre to use when looking up config file
90                '''
91                self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties"
92                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
93               
94                # Check this file exists; if not, assume an invalid datacentre has been specified
95                if not os.path.isfile(self._datacentre_config_filename):
96                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
97                        "specified (%s) is invalid\n" %datacentre)
98                   
99                datacentre_config_file = open(self._datacentre_config_filename, "r")
100               
101                for line in datacentre_config_file.readlines():
102                    words  = string.split(line)
103                    if len(words) == 0:
104                        continue
105                    if words[0] == 'host_path':
106                        self._harvest_home = string.rstrip(words[1])
107                    if words[0] == 'groups':
108                        self._datacentre_groups = words[1:]
109                    if words[0] == 'format':
110                        self._datacentre_format = words[1]
111                    if words[0] == 'namespace':
112                        self._datacentre_namespace = words[1]
113                    if words[0] == 'self._NDG_dataProvider':
114                        self._NDG_dataProvider = True
115               
116                datacentre_config_file.close()
117               
118                if self._harvest_home == "":
119                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename)
120               
121                logging.info("harvested records are in " + self._harvest_home)
122               
123                if self._datacentre_groups == "":
124                    logging.info("No groups/keywords set for datacentre " + datacentre)
125                else:
126                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
127               
128                if self._datacentre_format == "":
129                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename)
130               
131                logging.info("format being harvested: " + self._datacentre_format)
132               
133                if self._datacentre_namespace == "":
134                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename)
135               
136                logging.info("datacentre namespace: " + self._datacentre_namespace)
137               
138                if self._NDG_dataProvider:
139                        logging.info("Datacentre classified as an NDG data provider")
140                else:
141                        logging.info("Datacentre is not classified as an NDG data provider")
142                logging.info(self.lineSeparator)
143
144               
145        def _getDBConnection(self):
146                '''
147                Get the default DB connection - by reading in data from the db config file
148                '''
149                logging.info("Setting up connection to postgres DB")
150                dbinfo_file=open('ingest.config', "r")
151                dbinfo = dbinfo_file.read().split()
152                if len(dbinfo) < 4:
153                        raise ValueError, 'Incorrect data in config file'
154               
155                # if port specified in file, use this, otherwise use default
156                if len(dbinfo) > 4:
157                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4])
158                else:
159                        self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3])
160                logging.info("Postgres DB connection now set up")
161
162       
163        def usage(self):
164                '''
165                Display input params for the script
166                '''
167                logging.info("Usage: python oai_document_ingester.py [OPTION] <datacentre>")
168                logging.info(" - where:\n   <datacentre> is the data centre to ingest data from; and options are:")
169                logging.info(" -v - verbose mode for output logging")
170                logging.info(" -d - debug mode for output logging")
171                sys.exit(2)
172
173               
174        def __init__(self, datacentre=None):
175                '''
176                Main entry point for script
177                '''
178                self.lineSeparator = "-----------------------------"
179                logging.info(self.lineSeparator)
180                logging.info("RUNNING: oai_document_ingester.py")
181               
182                # check for verbose option
183                try:
184                    opts, args = getopt.getopt(sys.argv[1:], "vd")
185                except getopt.GetoptError, err:
186                    # print help information and exit:
187                    logging.info(str(err)) # will print something like "option -a not recognized"
188                   
189                loggingLevel = logging.WARNING
190                for o, a in opts:
191                    if o == "-v":
192                        logging.info(" - Verbose mode ON")
193                        loggingLevel = logging.INFO
194                    elif o == "-d":
195                        logging.info(" - Debug mode ON")
196                        loggingLevel = logging.DEBUG
197               
198                logging.basicConfig(level=loggingLevel,
199                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
200               
201                logging.info(self.lineSeparator)
202               
203                if datacentre is None:
204                        self.usage()
205               
206                # create file utils object to do various file related stuff
207                fileUtils = FileUtilities()
208               
209                status = 0
210                numfilesproc = 0
211                self._no_files_ingested = 0
212                self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from
213                       
214                data_dir = self._base_dir + "data/" + datacentre  # dir relating to the specified datacentre docs
215               
216                #Change os directory to that with the harvested documents in it.
217                os.chdir(self._base_dir)
218               
219                # - to run on Windows under cygwin, use the following
220                #os.putenv('PATH', 'C:\\opt\\cygwin\\bin')
221               
222                # set the global variables to retrieve from the config file
223                self._harvest_home = ""
224                self._datacentre_groups = ""
225                self._datacentre_format = ""
226                self._datacentre_namespace = ""
227                self._NDG_dataProvider = False
228                self.getConfigDetails(datacentre)
229               
230                # check harvest dir exists and that there are any records to harvest?
231                if not os.path.exists(self._harvest_home):
232                        logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \
233                                                 %(datacentre, self._harvest_home))
234                        return
235                elif len(os.listdir(self._harvest_home)) == 0:
236                        logging.info("Nothing to harvest this time from %s" %datacentre)
237                        return
238               
239                # The directory to put things for a tape backup (should already exist)
240                #backupdir = '/disks/glue1/oaiBackup/'
241                # TODO: uncomment above on live system
242                backupdir = data_dir + "/backups/"
243               
244                # the following dirs define where the specific documents should go
245                originals_dir = data_dir + "/oai/originals/"
246                discovery_dir = data_dir + "/discovery/"
247               
248                # Create/clear the 'in' directory pristine copy of the discovery records
249                fileUtils.setUpDir(originals_dir)
250                commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir
251                logging.info("Executing : " + commandline)
252                status = os.system(commandline)
253
254                if status !=0:
255                    sys.exit("Failed at making pristine copy stage")
256               
257                # Create/clear the directory for the 'out' processed copy of the discovery records.
258                fileUtils.setUpDir(discovery_dir)
259                   
260                #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file)
261                # - also replace any namespace declarations with a standard one which we know works in NDG
262                # NB, this copies files from the original dir to the discovery dir
263                logging.info(self.lineSeparator)
264                logging.info("Renaming files:")
265                for filename in os.listdir(originals_dir):
266                        if filename.endswith('.xml'):
267                                original_filename = originals_dir + filename
268                                ident=self.getID(original_filename)
269                               
270                                if self._NDG_dataProvider:
271                                        new_filename = discovery_dir + ident.replace(":","__")+".xml"
272                                else:
273                                                ident = ident.replace(":","-")
274                                                ident = ident.replace("/","-")
275                                                new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml"
276                                                logging.info("original file = " + original_filename)
277                                                logging.info("newfile = " + new_filename)
278                               
279                                # now correct any namespace issues
280                                try:
281                                    SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
282                                except:
283                                        sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename)
284                                numfilesproc += 1
285                        else:
286                                logging.warning('File %s is not xml format. Not processed'  %(filename))
287               
288                logging.info(self.lineSeparator)
289               
290                # now set up the required XQueries
291                # - NB, extract the xquery libraries locally for easy reference
292                self._xq=ndgXqueries()
293                for libFile in self._xq.xqlib:
294                        fileUtils.createFile(libFile, self._xq.xqlib[libFile])
295               
296                # Process the resulting files and put the data into the postgres DB
297                # - firstly set up a db connection to use
298                self._dbConnection = None
299                self._getDBConnection()
300               
301                filenames = os.listdir(discovery_dir)
302                for filename in filenames:
303                        self.addFileToPostgresDB(discovery_dir + filename)
304               
305                #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups
306                backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M")
307               
308                this_backupdir = backupdir_base + "_originals/"
309                fileUtils.makeBackUp(originals_dir, this_backupdir)
310               
311                #Clear out the original harvest records area and FINALMOLES
312                fileUtils.cleanDir(originals_dir)
313                fileUtils.cleanDir(discovery_dir)
314                # TODO: uncomment following line when live on system
315                #fileUtils.cleanDir(self._harvest_home)
316               
317                logging.info(self.lineSeparator)
318                logging.info("INFO: Number of files processed = %s" %numfilesproc)
319                logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested)
320                if status == 0:
321                    logging.info("INFO: Procedure oai_document_ingester.py completed")
322                else:
323                    logging.error("ERROR: Procedure oai_document_ingester.py FAILED with status %s" %status)
324                logging.info(self.lineSeparator)
325               
326       
327if __name__=="__main__":
328        logging.basicConfig(level=logging.INFO,
329                                                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
330        opts, args = getopt.getopt(sys.argv[1:], '-vd')
331        if len(args) < 1:
332                oai_document_ingester()
333       
334        oai_document_ingester(args[0])
Note: See TracBrowser for help on using the repository browser.