1 | #!/usr/bin/env python |
---|
2 | """ Script oai_ingest.py takes parameters <datacentre> <dbinfofile>. |
---|
3 | The /usr/local/WSClients/OAIBatch directory contains:- |
---|
4 | - this python script, plus some other modules eg ndgUtils for parts of the process. |
---|
5 | - a DataProvider specific config file, |
---|
6 | - the python module for extracting spatiotemporal information and adding to postgres db. |
---|
7 | Under this directory the following structure should be maintained: |
---|
8 | ./data |
---|
9 | - /DATACENTRE/ |
---|
10 | - discovery/: Re-named documents. |
---|
11 | - discovery_corrected Documents with schema namespaces corrected, ready to ingest in the discovery service. |
---|
12 | - oai/difYYYYMMDD/ Documents as harvested from OAI |
---|
13 | Where /DATACENTRE varies for the different data providers |
---|
14 | """ |
---|
15 | #History: |
---|
16 | # 12/05/06 SEL spelling correction |
---|
17 | # 30/05/06 SEL cope with many files for processing."Argument list too long" problem. |
---|
18 | # 31/05/06 SEL need to use passwords now. (replace xxxxxx in real version). |
---|
19 | # 16/10/06 SEL Changed to using python oaiClean.py module instead of java code. |
---|
20 | # 16/10/06 SEL exist db upgraded and deployed at different location, java upgrade. |
---|
21 | # 17/10/06 SEL cope with different discovery formats - not just DIF. |
---|
22 | # 23/10/06 SEL keywords not mandatory in config file. |
---|
23 | # 24/10/06 SEL fix bug where 'in' directory not being cleared initially. display more information when running. |
---|
24 | # December 2007 SEL rewrite to use Bryans' python XQuery stuff to create mini-moles instead of java. |
---|
25 | # Also extracted hard coded pwds into a file. |
---|
26 | # 11/04/08 CByrom Tidy up script by organising code into reusable functions + variables |
---|
27 | # + remove dependency on eXist DB |
---|
28 | |
---|
29 | import os, sys, string, getopt, logging |
---|
30 | from time import strftime |
---|
31 | from SchemaNameSpace import SchemaNameSpace |
---|
32 | from DIF import DIF |
---|
33 | from MDIP import MDIP |
---|
34 | import ndgUtils |
---|
35 | from ndgUtils.ndgXqueries import ndgXqueries |
---|
36 | from FileUtilities import FileUtilities |
---|
37 | from PostgresRecord import PostgresRecord |
---|
38 | from PostgresDAO import PostgresDAO |
---|
39 | import db_funcs |
---|
40 | |
---|
41 | class oai_ingest: |
---|
42 | ''' |
---|
43 | Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB |
---|
44 | - including running the various transforms and parsings to get all doc types and spatiotemporal data |
---|
45 | in the correct form in the DB |
---|
46 | ''' |
---|
47 | |
---|
48 | def getID(self, filename): |
---|
49 | ''' |
---|
50 | Gets the identifier out of an input metadata xml record. |
---|
51 | Copes with DIF and MDIP currently. |
---|
52 | @param filename - name of document file being processed |
---|
53 | @return: ID - id to use to refer to the document |
---|
54 | ''' |
---|
55 | logging.info("Retrieving identifier for metadata record " + filename) |
---|
56 | xml=file(filename).read() |
---|
57 | if self._datacentre_format == "DIF": |
---|
58 | d=DIF(xml) |
---|
59 | ID=d.entryID |
---|
60 | elif self._datacentre_format == "MDIP": |
---|
61 | d=MDIP(xml) |
---|
62 | ID=d.id |
---|
63 | else: |
---|
64 | sys.exit("Only handles DIF or MDIP here.") |
---|
65 | |
---|
66 | logging.info("Found identifier: " + ID) |
---|
67 | return ID |
---|
68 | |
---|
69 | |
---|
70 | def addFileToPostgresDB(self, filename): |
---|
71 | ''' |
---|
72 | Add a file to the postgres DB - extracting and storing all the required |
---|
73 | data in the process |
---|
74 | @param filename: full path of file to add to postgres DB |
---|
75 | ''' |
---|
76 | if not os.path.isfile(filename): |
---|
77 | logging.info("Skipping, %s - not a valid file" %filename) |
---|
78 | return |
---|
79 | |
---|
80 | logging.info("Adding file, " + filename + ", to postgres DB") |
---|
81 | discoveryID = self.getID(filename) |
---|
82 | |
---|
83 | # first of all create a PostgresRecord - this object represents all the data required |
---|
84 | # for a DB entry |
---|
85 | record = PostgresRecord(filename, self._NDG_dataProvider, \ |
---|
86 | self._datacentre_groups, self._datacentre_namespace, \ |
---|
87 | discoveryID, self._xq, self._datacentre_format) |
---|
88 | |
---|
89 | # Now create the data access object to interface to the DB |
---|
90 | dao = PostgresDAO(record, self._dbConnection) |
---|
91 | |
---|
92 | # Finally, write the new record |
---|
93 | dao.createOrUpdateRecord() |
---|
94 | |
---|
95 | |
---|
96 | def getConfigDetails(self, datacentre): |
---|
97 | ''' |
---|
98 | Get the harvested records directory and groups for this datacentre from the |
---|
99 | datacentre specific config file. The harvested records directory depends on the |
---|
100 | datacentres OAI base url, the set and format. These have to be know up-front. |
---|
101 | The groups denote which 'portal groups' they belong to - for limiting searches to |
---|
102 | say NERC-only datacentres records. |
---|
103 | Groups are added to the intermediate MOLES when it is created. |
---|
104 | @param datacentre: datacentre to use when looking up config file |
---|
105 | ''' |
---|
106 | self._datacentre_config_filename = self._base_dir + 'datacentre_config/' + datacentre + "_config.properties" |
---|
107 | logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename) |
---|
108 | |
---|
109 | # Check this file exists; if not, assume an invalid datacentre has been specified |
---|
110 | if not os.path.isfile(self._datacentre_config_filename): |
---|
111 | sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \ |
---|
112 | "specified (%s) is invalid\n" %datacentre) |
---|
113 | |
---|
114 | datacentre_config_file = open(self._datacentre_config_filename, "r") |
---|
115 | |
---|
116 | for line in datacentre_config_file.readlines(): |
---|
117 | words = string.split(line) |
---|
118 | if len(words) == 0: |
---|
119 | continue |
---|
120 | if words[0] == 'host_path': |
---|
121 | self._harvest_home = string.rstrip(words[1]) |
---|
122 | if words[0] == 'groups': |
---|
123 | self._datacentre_groups = words[1:] |
---|
124 | if words[0] == 'format': |
---|
125 | self._datacentre_format = words[1] |
---|
126 | if words[0] == 'namespace': |
---|
127 | self._datacentre_namespace = words[1] |
---|
128 | if words[0] == 'self._NDG_dataProvider': |
---|
129 | self._NDG_dataProvider = True |
---|
130 | |
---|
131 | datacentre_config_file.close() |
---|
132 | |
---|
133 | if self._harvest_home == "": |
---|
134 | sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
135 | |
---|
136 | logging.info("harvested records are in " + self._harvest_home) |
---|
137 | |
---|
138 | if self._datacentre_groups == "": |
---|
139 | logging.info("No groups/keywords set for datacentre " + datacentre) |
---|
140 | else: |
---|
141 | logging.info("datacentre groups/keywords: " + self._datacentre_groups) |
---|
142 | |
---|
143 | if self._datacentre_format == "": |
---|
144 | sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
145 | |
---|
146 | logging.info("format being harvested: " + self._datacentre_format) |
---|
147 | |
---|
148 | if self._datacentre_namespace == "": |
---|
149 | sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %self._datacentre_config_filename) |
---|
150 | |
---|
151 | logging.info("datacentre namespace: " + self._datacentre_namespace) |
---|
152 | |
---|
153 | if self._NDG_dataProvider: |
---|
154 | logging.info("Datacentre classified as an NDG data provider") |
---|
155 | else: |
---|
156 | logging.info("Datacentre is not classificied as an NDG data provider") |
---|
157 | print self.lineSeparator |
---|
158 | |
---|
159 | |
---|
160 | def _getDBConnection(self): |
---|
161 | ''' |
---|
162 | Get the default DB connection - by reading in data from the db config file |
---|
163 | ''' |
---|
164 | logging.info("Setting up connection to postgres DB") |
---|
165 | dbinfo_file=open('ingest.config', "r") |
---|
166 | dbinfo = dbinfo_file.read().split() |
---|
167 | if len(dbinfo) != 4: |
---|
168 | raise ValueError, 'Incorrect data in config file' |
---|
169 | |
---|
170 | self._dbConnection = db_funcs.db_connect(dbinfo[0], dbinfo[1], dbinfo[2], dbinfo[3]) |
---|
171 | logging.info("Postgres DB connection now set up") |
---|
172 | |
---|
173 | |
---|
174 | def usage(self): |
---|
175 | ''' |
---|
176 | Display input params for the script |
---|
177 | ''' |
---|
178 | print "Usage: python -v oai_ingest.py <datacentre>" |
---|
179 | print " - where:\n <datacentre> is the data centre to ingest data from; and" |
---|
180 | print " -v - verbose mode for output logging" |
---|
181 | sys.exit(2) |
---|
182 | |
---|
183 | |
---|
184 | def __init__(self, datacentre=None): |
---|
185 | ''' |
---|
186 | Main entry point for script |
---|
187 | ''' |
---|
188 | self.lineSeparator = "-----------------------------" |
---|
189 | print self.lineSeparator |
---|
190 | print "RUNNING: oai_ingest.py" |
---|
191 | |
---|
192 | # check for verbose option |
---|
193 | try: |
---|
194 | opts, args = getopt.getopt(sys.argv[1:], "v") |
---|
195 | except getopt.GetoptError, err: |
---|
196 | # print help information and exit: |
---|
197 | print str(err) # will print something like "option -a not recognized" |
---|
198 | self.usage() |
---|
199 | |
---|
200 | loggingLevel = logging.WARNING |
---|
201 | for o, a in opts: |
---|
202 | if o == "-v": |
---|
203 | print " - Verbose mode ON" |
---|
204 | loggingLevel = logging.DEBUG |
---|
205 | |
---|
206 | logging.basicConfig(level=loggingLevel, |
---|
207 | format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s') |
---|
208 | |
---|
209 | print self.lineSeparator |
---|
210 | |
---|
211 | if datacentre is None: |
---|
212 | self.usage() |
---|
213 | |
---|
214 | # create file utils object to do various file related stuff |
---|
215 | fileUtils = FileUtilities() |
---|
216 | |
---|
217 | status = 0 |
---|
218 | numfilesproc = 0 |
---|
219 | self._base_dir = os.getcwd() + "/"# this is the base dir that the script is ran from |
---|
220 | |
---|
221 | data_dir = self._base_dir + "data/" + datacentre # dir relating to the specified datacentre docs |
---|
222 | |
---|
223 | #Change os directory to that with the harvested documents in it. |
---|
224 | os.chdir(self._base_dir) |
---|
225 | |
---|
226 | # - to run on Windows under cygwin, use the following |
---|
227 | #os.putenv('PATH', 'C:\\opt\\cygwin\\bin') |
---|
228 | |
---|
229 | # set the global variables to retrieve from the config file |
---|
230 | self._harvest_home = "" |
---|
231 | self._datacentre_groups = "" |
---|
232 | self._datacentre_format = "" |
---|
233 | self._datacentre_namespace = "" |
---|
234 | self._NDG_dataProvider = False |
---|
235 | self.getConfigDetails(datacentre) |
---|
236 | |
---|
237 | #any records to harvest? |
---|
238 | if len( os.listdir(self._harvest_home)) == 0: |
---|
239 | logging.info("Nothing to harvest this time from " + datacentre) |
---|
240 | sys.exit() |
---|
241 | |
---|
242 | # The directory to put things for a tape backup (should already exist) |
---|
243 | #backupdir = '/disks/glue1/oaiBackup/' |
---|
244 | # TODO: uncomment above on live system |
---|
245 | backupdir = data_dir + "/backups/" |
---|
246 | |
---|
247 | # the following dirs define where the specific documents should go |
---|
248 | originals_dir = data_dir + "/oai/originals/" |
---|
249 | discovery_dir = data_dir + "/discovery/" |
---|
250 | |
---|
251 | # Create/clear the 'in' directory pristine copy of the discovery records |
---|
252 | fileUtils.setUpDir(originals_dir) |
---|
253 | commandline = "ls -1 " + self._harvest_home + "/ | xargs -i cp " + self._harvest_home + "/{\} " + originals_dir |
---|
254 | #commandline = "find " + self._harvest_home + " -type f -print | xargs -i cp \{\} " + originals_dir |
---|
255 | logging.info("Executing : " + commandline) |
---|
256 | status = os.system(commandline) |
---|
257 | |
---|
258 | if status !=0: |
---|
259 | sys.exit("Failed at making pristine copy stage") |
---|
260 | |
---|
261 | # Create/clear the directory for the 'out' processed copy of the discovery records. |
---|
262 | fileUtils.setUpDir(discovery_dir) |
---|
263 | |
---|
264 | #Execute the script which processes/renames the files (changed 08/01/07 to get id from inside file) |
---|
265 | # - also replace any namespace declarations with a standard one which we know works in NDG |
---|
266 | # NB, this copies files from the original dir to the discovery dir |
---|
267 | logging.info(self.lineSeparator) |
---|
268 | logging.info("Renaming files:") |
---|
269 | for filename in os.listdir(originals_dir): |
---|
270 | if filename.endswith('.xml'): |
---|
271 | original_filename = originals_dir + filename |
---|
272 | ident=self.getID(original_filename) |
---|
273 | |
---|
274 | if self._NDG_dataProvider: |
---|
275 | new_filename = discovery_dir + ident.replace(":","__")+".xml" |
---|
276 | else: |
---|
277 | ident = ident.replace(":","-") |
---|
278 | ident = ident.replace("/","-") |
---|
279 | new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+self._datacentre_format+ "__"+ ident +".xml" |
---|
280 | logging.info("original file = " + original_filename) |
---|
281 | logging.info("newfile = " + new_filename) |
---|
282 | |
---|
283 | # now correct any namespace issues |
---|
284 | try: |
---|
285 | SchemaNameSpace(original_filename, new_filename, self._datacentre_format) |
---|
286 | except: |
---|
287 | sys.exit("ERROR: SchemaNameSpace failed on file %s" %original_filename) |
---|
288 | numfilesproc += 1 |
---|
289 | else: |
---|
290 | logging.warning('File %s is not xml format. Not processed' %(filename)) |
---|
291 | |
---|
292 | logging.info(self.lineSeparator) |
---|
293 | |
---|
294 | # now set up the required XQueries |
---|
295 | # - NB, extract the xquery libraries locally for easy reference |
---|
296 | self._xq=ndgXqueries() |
---|
297 | for libFile in self._xq.xqlib: |
---|
298 | fileUtils.createFile(libFile, self._xq.xqlib[libFile]) |
---|
299 | |
---|
300 | # Process the resulting files and put the data into the postgres DB |
---|
301 | # - firstly set up a db connection to use |
---|
302 | self._dbConnection = None |
---|
303 | self._getDBConnection() |
---|
304 | |
---|
305 | filenames = os.listdir(discovery_dir) |
---|
306 | for filename in filenames: |
---|
307 | self.addFileToPostgresDB(discovery_dir + filename) |
---|
308 | |
---|
309 | #Make copies of discovery and oai/originals and DIF2MOLES areas to backup area for tape backups |
---|
310 | backupdir_base = backupdir + datacentre + "_" + strftime("%y%m%d_%H%M") |
---|
311 | |
---|
312 | this_backupdir = backupdir_base + "_originals/" |
---|
313 | fileUtils.makeBackUp(originals_dir, this_backupdir) |
---|
314 | |
---|
315 | #Clear out the original harvest records area and FINALMOLES |
---|
316 | fileUtils.cleanDir(originals_dir) |
---|
317 | fileUtils.cleanDir(discovery_dir) |
---|
318 | # TODO: uncomment following line when live on system |
---|
319 | #fileUtils.cleanDir(self._harvest_home) |
---|
320 | |
---|
321 | print self.lineSeparator |
---|
322 | print "INFO: No. of files pre-processed = %s" %numfilesproc |
---|
323 | if status == 0: |
---|
324 | print "INFO: Procedure oai_ingest.py completed" |
---|
325 | else: |
---|
326 | print "ERROR: Procedure oai_ingest.py FAILED with status %s" %status |
---|
327 | print self.lineSeparator |
---|
328 | |
---|
329 | |
---|
330 | if __name__=="__main__": |
---|
331 | opts, args = getopt.getopt(sys.argv[1:], "v") |
---|
332 | if len(args) < 1: |
---|
333 | oai_ingest() |
---|
334 | |
---|
335 | oai_ingest(args[0]) |
---|