1 | #!/usr/bin/env python |
---|
2 | ''' |
---|
3 | Main script to do the document ingest from the OAI harvested files to the |
---|
4 | discovery postgres DB. NB, can be ran for all datacentres using the run_all_ingest.py script |
---|
5 | or can specify an individual datacentre to run the ingester on. |
---|
6 | As well as doing the ingest, a backup directory is created to store the created moles files. |
---|
7 | ''' |
---|
8 | import os, sys, string, getopt, logging |
---|
9 | from time import strftime |
---|
10 | from SchemaNameSpace import SchemaNameSpace |
---|
11 | from DIF import DIF |
---|
12 | from MDIP import MDIP |
---|
13 | import ndgUtils |
---|
14 | from ndgUtils.ndgXqueries import ndgXqueries |
---|
15 | from FileUtilities import FileUtilities |
---|
16 | from PostgresRecord import PostgresRecord |
---|
17 | from PostgresDAO import PostgresDAO |
---|
18 | import db_funcs |
---|
19 | |
---|
20 | class oai_document_ingester: |
---|
21 | ''' |
---|
22 | Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB |
---|
23 | - including running the various transforms and parsings to get all doc types and spatiotemporal |
---|
24 | data in the correct form in the DB |
---|
25 | ''' |
---|
26 | |
---|
27 | def getID(self, filename): |
---|
28 | ''' |
---|
29 | Gets the identifier out of an input metadata xml record. |
---|
30 | Copes with DIF and MDIP currently. |
---|
31 | @param filename - name of document file being processed |
---|
32 | @return: ID - id to use to refer to the document |
---|
33 | ''' |
---|
34 | logging.info("Retrieving identifier for metadata record " + filename) |
---|
35 | xml=file(filename).read() |
---|
36 | if self._datacentre_format == "DIF": |
---|
37 | d=DIF(xml) |
---|
38 | ID=d.entryID |
---|
39 | elif self._datacentre_format == "MDIP": |
---|
40 | d=MDIP(xml) |
---|
41 | ID=d.id |
---|
42 | else: |
---|
43 | raise TypeError, "Only handles DIF or MDIP here." |
---|
44 | |
---|
45 | logging.info("Found identifier: " + ID) |
---|
46 | return ID |
---|
47 | |
---|
48 | |
---|
49 | def addFileToPostgresDB(self, filename): |
---|
50 | ''' |
---|
51 | Add a file to the postgres DB - extracting and storing all the required |
---|
52 | data in the process |
---|
53 | @param filename: full path of file to add to postgres DB |
---|
54 | ''' |
---|
55 | if not os.path.isfile(filename): |
---|
56 | logging.info("Skipping, %s - not a valid file" %filename) |
---|
57 | return |
---|
58 | |
---|
59 | logging.info("Adding file, " + filename + ", to postgres DB") |
---|
60 | |
---|
61 | # first of all create a PostgresRecord - this object represents all the data required |
---|
62 | # for a DB entry |
---|
63 | dao = None |
---|
64 | try: |
---|
65 | discoveryID = self.getID(filename) |
---|
66 | |
---|
67 | record = PostgresRecord(filename, self._NDG_dataProvider, \ |
---|
68 | self._datacentre_groups, self._datacentre_namespace, \ |
---|
69 | discoveryID, self._xq, self._datacentre_format) |
---|
70 | |
---|
71 | # Now create the data access object to interface to the DB |
---|
72 | dao = PostgresDAO(record, self._dbConnection) |
---|
73 | |
---|
74 | # Finally, write the new record |
---|
75 | if dao.createOrUpdateRecord(): |
---|
76 | self._no_files_ingested += 1 |
---|
77 | except: |
---|
78 | logging.error("Exception thrown - detail: ") |
---|
79 | logging.error(sys.exc_info()) |
---|
80 | |
---|
81 | if dao: |
---|
82 | logging.info("Removing record and its associated info from DB") |
---|
83 | logging.info("- to allow clean ingestion on rerun") |
---|
84 | try: |
---|
85 | dao.deleteOriginalRecord() |
---|
86 | except: |
---|
87 | logging.error("Problem encountered when removing record: ") |
---|
88 | logging.error(sys.exc_info()) |
---|
89 | logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested") |
---|
90 | |
---|
91 | self._no_problem_files += 1 |
---|
92 | logging.info("Continue processing other files") |
---|
93 | |
---|
94 | |
---|
95 | def getConfigDetails(self, datacentre): |
---|
96 | ''' |
---|
97 | Get the harvested records directory and groups for this datacentre from the |
---|
98 | datacentre specific config file. The harvested records directory depends on the |
---|
99 | datacentres OAI base url, the set and format. These have to be know up-front. |
---|
100 | The groups denote which 'portal groups' they belong to - for limiting searches to |
---|
101 | say NERC-only datacentres records. |
---|
102 | Groups are added to the intermediate MOLES when it is created. |
---|
103 | @param datacentre: datacentre to use when looking up config file |
---|
104 | ''' |
---|
105 | self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties" |
---|
106 | logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename) |
---|
107 | |
---|
108 | # Check this file exists; if not, assume an invalid datacentre has been specified |
---|
109 | if not os.path.isfile(self._datacentre_config_filename): |
---|
110 | sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ |
---|
111 | "specified (%s) is invalid\n" %datacentre) |
---|
112 | |
---|
113 | datacentre_config_file = open(self._datacentre_config_filename, "r") |
---|
114 | |
---|
115 | for line in datacentre_config_file.readlines(): |
---|
116 | words = string.split(line) |
---|
117 | if len(words) == 0: |
---|
118 | continue |
---|
119 | if words[0] == 'host_path': |
---|
120 | self._harvest_home = string.rstrip(words[1]) |
---|
121 | if words[0] == 'groups': |
---|
122 | self._datacentre_groups = words[1:] |
---|
123 | if words[0] == 'format': |
---|
124 | self._datacentre_format = words[1] |
---|
125 | if words[0] == 'namespace': |
---|
126 | self._datacentre_namespace = words[1] |
---|
127 | if words[0] == 'self._NDG_dataProvider': |
---|
128 | self._NDG_dataProvider = True |
---|
129 | |
---|
130 | datacentre_config_file.close() |
---|
131 | |
---|
132 | if self._harvest_home == "": |
---|
133 | sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
134 | |
---|
135 | logging.info("harvested records are in " + self._harvest_home) |
---|
136 | |
---|
137 | if self._datacentre_groups == "": |
---|
138 | logging.info("No groups/keywords set for datacentre " + datacentre) |
---|
139 | else: |
---|
140 | logging.info("datacentre groups/keywords: " + str(self._datacentre_groups)) |
---|
141 | |
---|
142 | if self._datacentre_format == "": |
---|
143 | sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
144 | |
---|
145 | logging.info("format being harvested: " + self._datacentre_format) |
---|
146 | |
---|
147 | if self._datacentre_namespace == "": |
---|
148 | sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
149 | |
---|
150 | logging.info("datacentre namespace: " + self._datacentre_namespace) |
---|
151 | |
---|
152 | if self._NDG_dataProvider: |
---|
153 | logging.info("Datacentre classified as an NDG data provider") |
---|
154 | else: |
---|
155 | logging.info("Datacentre is not classified as an NDG data provider") |
---|
156 | logging.info(self.lineSeparator) |
---|
157 | |
---|
158 | |
---|
159 | def _getDBConnection(self): |
---|
160 | ''' |
---|
161 | Get the default DB connection - by reading in data from the db config file |
---|
162 | ''' |
---|
163 | logging.info("Setting up connection to postgres DB") |
---|
164 | dbinfo_file=open('ingest.config', "r") |
---|
165 | dbinfo = dbinfo_file.read().split() |
---|
166 | if len(dbinfo) < 4: |
---|
167 | raise ValueError, 'Incorrect data in config file' |
---|
168 | |
---|
169 | # if port specified in file, use this, otherwise use default |
---|
170 | if len(dbinfo) > 4: |
---|
171 | self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3], dbinfo[4]) |
---|
172 | else: |
---|
173 | self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) |
---|
174 | logging.info("Postgres DB connection now set up") |
---|
175 | |
---|
176 | |
---|
177 | def usage(self): |
---|
178 | ''' |
---|
179 | Display input params for the script |
---|
180 | ''' |
---|
181 | print "Usage: python oai_document_ingester.py [OPTION] <datacentre>" |
---|
182 | print " - where:\n <datacentre> is the data centre to ingest data from; and options are:" |
---|
183 | print " -v - verbose mode for output logging" |
---|
184 | print " -d - debug mode for output logging" |
---|
185 | sys.exit(2) |
---|
186 | |
---|
187 | |
---|
188 | def __init__(self, datacentre=None): |
---|
189 | ''' |
---|
190 | Main entry point for script |
---|
191 | ''' |
---|
192 | self.lineSeparator = "-----------------------------" |
---|
193 | print self.lineSeparator |
---|
194 | print "RUNNING: oai_document_ingester.py" |
---|
195 | |
---|
196 | # check for verbose option |
---|
197 | try: |
---|
198 | opts, args = getopt.getopt(sys.argv[1:], "vd") |
---|
199 | except getopt.GetoptError, err: |
---|
200 | # print help information and exit: |
---|
201 | print str(err) # will print something like "option -a not recognized" |
---|
202 | |
---|
203 | loggingLevel = logging.WARNING |
---|
204 | for o, a in opts: |
---|
205 | if o == "-v": |
---|
206 | print " - Verbose mode ON" |
---|
207 | loggingLevel = logging.INFO |
---|
208 | elif o == "-d": |
---|
209 | print " - Debug mode ON" |
---|
210 | loggingLevel = logging.DEBUG |
---|
211 | |
---|
212 | print self.lineSeparator |
---|
213 | logging.basicConfig(level=loggingLevel, |
---|
214 | format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') |
---|
215 | |
---|
216 | if datacentre is None: |
---|
217 | self.usage() |
---|
218 | |
---|
219 | # create file utils object to do various file related stuff |
---|
220 | fileUtils = FileUtilities() |
---|
221 | |
---|
222 | numfilesproc = 0 |
---|
223 | self._no_files_ingested = 0 |
---|
224 | self._no_problem_files = 0 |
---|
225 | self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from |
---|
226 | |
---|
227 | data_dir = self._base_dir + "data/" + datacentre # dir relating to the specified datacentre docs |
---|
228 | |
---|
229 | #Change os directory to that with the harvested documents in it. |
---|
230 | os.chdir(self._base_dir) |
---|
231 | |
---|
232 | # - to run on Windows under cygwin, use the following |
---|
233 | #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') |
---|
234 | |
---|
235 | # set the global variables to retrieve from the config file |
---|
236 | self._harvest_home = "" |
---|
237 | self._datacentre_groups = "" |
---|
238 | self._datacentre_format = "" |
---|
239 | self._datacentre_namespace = "" |
---|
240 | self._NDG_dataProvider = False |
---|
241 | self.getConfigDetails(datacentre) |
---|
242 | |
---|
243 | # check harvest dir exists and that there are any records to harvest? |
---|
244 | if not os.path.exists(self._harvest_home): |
---|
245 | logging.info("Harvest directory for datacentre %s (%s) could not be found - exiting" \ |
---|
246 | %(datacentre, self._harvest_home)) |
---|
247 | return |
---|
248 | elif len(os.listdir(self._harvest_home)) == 0: |
---|
249 | logging.info("Nothing to harvest this time from %s" %datacentre) |
---|
250 | return |
---|
251 | |
---|
252 | # The directory to put things for a tape backup (should already exist) |
---|
253 | backupdir = '/disks/glue1/oaiBackup/' |
---|
254 | |
---|
255 | # the following dirs define where the specific documents should go |
---|
256 | originals_dir = data_dir + "/oai/originals/" |
---|
257 | discovery_dir = data_dir + "/discovery/" |
---|
258 | |
---|
259 | # Create/clear the 'in' directory pristine copy of the discovery records |
---|
260 | fileUtils.setUpDir(originals_dir) |
---|
261 | commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir |
---|
262 | logging.info("Executing : " + commandline) |
---|
263 | status = os.system(commandline) |
---|
264 | |
---|
265 | if status !=0: |
---|
266 | sys.exit("Failed at making pristine copy stage") |
---|
267 | |
---|
268 | # Create/clear the directory for the 'out' processed copy of the discovery records. |
---|
269 | fileUtils.setUpDir(discovery_dir) |
---|
270 | |
---|
271 | #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) |
---|
272 | # - also replace any namespace declarations with a standard one which we know works in NDG |
---|
273 | # NB, this copies files from the original dir to the discovery dir |
---|
274 | logging.info(self.lineSeparator) |
---|
275 | logging.info("Renaming files:") |
---|
276 | for filename in os.listdir(originals_dir): |
---|
277 | if filename.endswith('.xml'): |
---|
278 | original_filename = originals_dir + filename |
---|
279 | try: |
---|
280 | ident=self.getID(original_filename) |
---|
281 | except Exception, detail: |
---|
282 | logging.error("Could not retrieve ID from file, %s" %filename) |
---|
283 | logging.error("Detail: %s" %detail) |
---|
284 | logging.info("Continue with next file") |
---|
285 | continue |
---|
286 | |
---|
287 | if self._NDG_dataProvider: |
---|
288 | new_filename = discovery_dir + ident.replace(":","__")+".xml" |
---|
289 | else: |
---|
290 | ident = ident.replace(":","-") |
---|
291 | ident = ident.replace("/","-") |
---|
292 | new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" |
---|
293 | logging.info("original file = " + original_filename) |
---|
294 | logging.info("newfile = " + new_filename) |
---|
295 | |
---|
296 | # now correct any namespace issues |
---|
297 | try: |
---|
298 | SchemaNameSpace(original_filename, new_filename, self._datacentre_format) |
---|
299 | except Exception, detail: |
---|
300 | logging.error("SchemaNameSpace failed on file %s" %original_filename) |
---|
301 | logging.error("Detail: %s" %detail) |
---|
302 | logging.info("Continue with next file") |
---|
303 | continue |
---|
304 | numfilesproc += 1 |
---|
305 | else: |
---|
306 | logging.warning('File %s is not xml format. Not processed' %(filename)) |
---|
307 | |
---|
308 | logging.info(self.lineSeparator) |
---|
309 | |
---|
310 | # now set up the required XQueries |
---|
311 | # - NB, extract the xquery libraries locally for easy reference |
---|
312 | self._xq=ndgXqueries() |
---|
313 | for libFile in self._xq.xqlib: |
---|
314 | fileUtils.createFile(libFile, self._xq.xqlib[libFile]) |
---|
315 | |
---|
316 | # Process the resulting files and put the data into the postgres DB |
---|
317 | # - firstly set up a db connection to use |
---|
318 | self._dbConnection = None |
---|
319 | self._getDBConnection() |
---|
320 | |
---|
321 | filenames = os.listdir(discovery_dir) |
---|
322 | for filename in filenames: |
---|
323 | self.addFileToPostgresDB(discovery_dir + filename) |
---|
324 | |
---|
325 | #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups |
---|
326 | backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") |
---|
327 | |
---|
328 | this_backupdir = backupdir_base + "_originals/" |
---|
329 | fileUtils.makeBackUp(originals_dir, this_backupdir) |
---|
330 | |
---|
331 | #Clear out the original harvest records area and discovery dir |
---|
332 | fileUtils.cleanDir(originals_dir) |
---|
333 | fileUtils.cleanDir(discovery_dir) |
---|
334 | |
---|
335 | logging.info("oai_document_ingest processing complete:") |
---|
336 | if self._no_problem_files == 0: |
---|
337 | logging.info("All files successfully processed - cleaning harvest directory") |
---|
338 | fileUtils.cleanDir(self._harvest_home) |
---|
339 | else: |
---|
340 | logging.error("Problems experienced with %s files" %self._no_problem_files) |
---|
341 | logging.error("- harvest directory will not be cleared until these have been fixed and the script has been reran") |
---|
342 | |
---|
343 | logging.info(self.lineSeparator) |
---|
344 | logging.info("INFO: Number of files processed = %s" %numfilesproc) |
---|
345 | logging.info("INFO: Number of files ingested = %s" %self._no_files_ingested) |
---|
346 | logging.info(self.lineSeparator) |
---|
347 | print "Script finished running." |
---|
348 | |
---|
349 | |
---|
350 | if __name__=="__main__": |
---|
351 | opts, args = getopt.getopt(sys.argv[1:], '-vd') |
---|
352 | if len(args) < 1: |
---|
353 | oai_document_ingester() |
---|
354 | |
---|
355 | oai_document_ingester(args[0]) |
---|